process_twincat_single
Validate and automatically fix TwinCAT 3 XML files using a deterministic workflow that includes pre-checks, strict fixes, post-validation, and safety suggestions.
Instructions
Run enforced deterministic single-file TwinCAT workflow.
Steps:
validate_file (pre-check)
autofix_file (strict pipeline)
validate_file (post-check)
suggest_fixes (only if still unsafe)
Args: file_path: Path to the TwinCAT file to process. create_backup: Create a backup before applying fixes. validation_level: "all", "critical", or "style". enforcement_mode: Policy enforcement mode ("strict" or "compat"). include_knowledge_hints: Include recommended_check_ids from blockers. intent_profile: Programming paradigm intent — "auto" (default), "procedural", or "oop". Controls which check families run: - "procedural": OOP checks are skipped (safe for plain FUNCTION_BLOCK/PROGRAM). - "oop": Full OOP check family is enforced. - "auto": Resolved from file content (EXTENDS/IMPLEMENTS → oop, else procedural).
Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
| file_path | Yes | ||
| create_backup | No | ||
| validation_level | No | all | |
| enforcement_mode | No | strict | |
| include_knowledge_hints | No | ||
| intent_profile | No | auto |
Output Schema
| Name | Required | Description | Default |
|---|---|---|---|
| result | Yes |
Implementation Reference
- The process_twincat_single tool implementation, which runs an enforced deterministic single-file TwinCAT workflow by coordinating validate_file and autofix_file steps.
def process_twincat_single( file_path: str, create_backup: bool = False, validation_level: str = "all", enforcement_mode: str = DEFAULT_ENFORCEMENT_MODE, include_knowledge_hints: bool = False, intent_profile: str = "auto", ) -> str: """Run enforced deterministic single-file TwinCAT workflow. Steps: 1. validate_file (pre-check) 2. autofix_file (strict pipeline) 3. validate_file (post-check) 4. suggest_fixes (only if still unsafe) Args: file_path: Path to the TwinCAT file to process. create_backup: Create a backup before applying fixes. validation_level: "all", "critical", or "style". enforcement_mode: Policy enforcement mode ("strict" or "compat"). include_knowledge_hints: Include recommended_check_ids from blockers. intent_profile: Programming paradigm intent — "auto" (default), "procedural", or "oop". Controls which check families run: - "procedural": OOP checks are skipped (safe for plain FUNCTION_BLOCK/PROGRAM). - "oop": Full OOP check family is enforced. - "auto": Resolved from file content (EXTENDS/IMPLEMENTS → oop, else procedural). """ _t0 = time.monotonic() ctx = None try: mode_error = _validate_enforcement_mode(enforcement_mode, start_time=_t0) if mode_error: return mode_error ctx = _resolve_execution_context(file_path, enforcement_mode=enforcement_mode) # Lazy imports to avoid registration-order problems. from twincat_validator.server import autofix_file, suggest_fixes, validate_file if intent_profile not in _VALID_INTENT_PROFILES: return _tool_error( f"Invalid intent_profile: {intent_profile}", file_path=file_path, start_time=_t0, execution_context=ctx, valid_intent_profiles=list(_VALID_INTENT_PROFILES), ) if validation_level not in ["all", "critical", "style"]: return _tool_error( f"Invalid validation_level: {validation_level}", file_path=file_path, start_time=_t0, execution_context=ctx, valid_levels=["all", "critical", "style"], ) # Resolve intent profile from file content for engine-level category filtering. try: _file_content_for_intent = Path(file_path).read_text( encoding="utf-8", errors="replace" ) except OSError: _file_content_for_intent = None intent_profile_resolved = _resolve_intent_profile( _file_content_for_intent, intent_profile ) pre_validation = json.loads( validate_file( file_path, validation_level=validation_level, profile="llm_strict", enforcement_mode=enforcement_mode, intent_profile=intent_profile_resolved, ) ) if not pre_validation.get("success", False): return _with_meta( { "success": False, "file_path": file_path, "workflow": "single_strict_pipeline", "failed_step": "validate_file_pre", "step_error": pre_validation, "done": False, "terminal_mode": False, "next_action": "inspect_error", }, _t0, execution_context=ctx, ) autofix_result = json.loads( autofix_file( file_path=file_path, create_backup=create_backup, profile="llm_strict", format_profile="twincat_canonical", strict_contract=True, create_implicit_files=True, orchestration_hints=True, enforcement_mode=enforcement_mode, intent_profile=intent_profile_resolved, ) ) if not autofix_result.get("success", False): return _with_meta( { "success": False, "file_path": file_path, "workflow": "single_strict_pipeline", "failed_step": "autofix_file", "step_error": autofix_result, "done": False, "terminal_mode": False, "next_action": "inspect_error", }, _t0, execution_context=ctx, ) post_validation = json.loads( validate_file( file_path, validation_level=validation_level, profile="llm_strict", enforcement_mode=enforcement_mode, intent_profile=intent_profile_resolved, ) ) if not post_validation.get("success", False): return _with_meta( { "success": False, "file_path": file_path, "workflow": "single_strict_pipeline", "failed_step": "validate_file_post", "step_error": post_validation, "done": False, "terminal_mode": False, "next_action": "inspect_error", }, _t0, execution_context=ctx, ) safe_to_import = bool(autofix_result.get("safe_to_import")) and bool( post_validation.get("safe_to_import") ) safe_to_compile = bool(autofix_result.get("safe_to_compile")) and bool( post_validation.get("safe_to_compile") ) done = safe_to_import and safe_to_compile blockers = autofix_result.get("blockers", []) or [] check_categories_executed = ( ["core", "oop"] if intent_profile_resolved == "oop" else ["core"] ) workflow_compliance_warnings = _collect_intent_mismatch_warnings( intent_profile_resolved, steps=[ ("validate_file_pre", pre_validation), ("autofix_file", autofix_result), ("validate_file_post", post_validation), ], ) result = { "success": True, "file_path": file_path, "workflow": "single_strict_pipeline", "tools_used": [ "validate_file", "autofix_file", "validate_file", ], "intent_profile_requested": intent_profile, "intent_profile_resolved": intent_profile_resolved, "check_categories_executed": check_categories_executed, "workflow_compliance_warnings": workflow_compliance_warnings, "safe_to_import": safe_to_import, "safe_to_compile": safe_to_compile, "pre_validation": pre_validation, "autofix": autofix_result, "post_validation": post_validation, "done": done, "status": "done" if done else "blocked", "blocking_count": int(autofix_result.get("blocking_count", 0) or 0), "blockers": blockers, "effective_oop_policy": { "policy_source": ctx.policy_source, "policy": ctx.effective_oop_policy, }, } no_change = bool(autofix_result.get("no_change_detected", False)) no_progress = int(autofix_result.get("no_progress_count", 0) or 0) contract_failed = bool(autofix_result.get("contract_passed") is False) derived_action, terminal = _derive_next_action( safe_to_import=safe_to_import, safe_to_compile=safe_to_compile, blockers=blockers, no_change_detected=no_change, no_progress_count=no_progress, contract_failed=contract_failed, ) # When the file is fully clean, the workflow is always terminal. # _derive_next_action returns terminal=False for "done" (meaning "not stuck"), # but the single-file contract means terminal=True when done=True. result["terminal_mode"] = True if done else terminal result["next_action"] = derived_action result["allow_followup_autofix_without_user_request"] = False if not result["done"]: full_validation = validate_file( file_path, validation_level=validation_level, profile="full", enforcement_mode=enforcement_mode, intent_profile=intent_profile_resolved, ) suggestions = json.loads(suggest_fixes(full_validation)) result["tools_used"].append("suggest_fixes") result["suggested_fixes"] = suggestions if include_knowledge_hints: result["recommended_check_ids"] = sorted( set(b["check_id"] for b in blockers if b.get("check_id")) ) _assert_orchestration_contract(result, is_batch=False) return _with_meta(result, _t0, execution_context=ctx) except Exception as e: error_kwargs = {"execution_context": ctx} if ctx is None: error_kwargs.update(unresolved_policy_fields(enforcement_mode)) return _tool_error(str(e), file_path=file_path, start_time=_t0, **error_kwargs)