Skip to main content
Glama
execution_pipeline.py16.1 kB
"""Execution Pipeline - Manages autonomous tool execution flow""" import queue import threading import time from datetime import datetime from enum import Enum from typing import Any, Callable, Dict, List import FreeCAD import FreeCADGui class StepStatus(Enum): """Status of an execution step""" PENDING = "pending" RUNNING = "running" COMPLETED = "completed" FAILED = "failed" SKIPPED = "skipped" ROLLED_BACK = "rolled_back" class ExecutionPipeline: """ Manages the execution of tool operations in a controlled pipeline with support for rollback, validation, and progress tracking. """ def __init__(self, agent_manager): """ Initialize the Execution Pipeline Args: agent_manager: Reference to the parent AgentManager """ self.agent_manager = agent_manager self.current_execution = None self.execution_thread = None self.should_stop = False self.is_paused = False self.execution_queue = queue.Queue() # Execution tracking self.step_results = [] self.undo_stack = [] self.execution_log = [] # Callbacks self.step_callbacks = { "on_step_start": [], "on_step_complete": [], "on_step_failed": [], "on_progress": [], } # Safety settings self.safety_config = { "max_execution_time": 300, # 5 minutes per step "validate_before_execute": True, "auto_save_before_execute": True, "create_checkpoints": True, "rollback_on_failure": True, } def execute(self, plan: Dict) -> Dict: """ Execute a complete plan Args: plan: Execution plan with steps Returns: Execution result dictionary """ # Initialize execution self.current_execution = { "plan_id": plan["id"], "start_time": datetime.now(), "status": "running", "current_step": 0, "total_steps": len(plan["steps"]), "results": [], } # Reset tracking self.step_results = [] self.should_stop = False self.is_paused = False # Create checkpoint if enabled if self.safety_config["create_checkpoints"]: self._create_checkpoint() # Auto-save if enabled if self.safety_config["auto_save_before_execute"]: self._auto_save_document() # Execute in separate thread self.execution_thread = threading.Thread( target=self._execute_plan_thread, args=(plan,) ) self.execution_thread.start() self.execution_thread.join() # Wait for completion # Prepare final result result = { "success": self.current_execution["status"] == "completed", "executed_steps": self.step_results, "failed_step": None, "outputs": [], "errors": [], "duration": ( datetime.now() - self.current_execution["start_time"] ).total_seconds(), } # Find failed step if any for step_result in self.step_results: if step_result["status"] == StepStatus.FAILED: result["failed_step"] = step_result["step"] result["errors"].append(step_result.get("error", "Unknown error")) break elif step_result["status"] == StepStatus.COMPLETED: if "output" in step_result: result["outputs"].append(step_result["output"]) return result def _execute_plan_thread(self, plan: Dict): """Execute plan in separate thread""" try: for i, step in enumerate(plan["steps"]): if self.should_stop: self._log("Execution stopped by user") self.current_execution["status"] = "cancelled" break while self.is_paused: time.sleep(0.1) if self.should_stop: break self.current_execution["current_step"] = i + 1 # Execute step step_result = self._execute_step(step, i + 1, len(plan["steps"])) self.step_results.append(step_result) # Check if step failed if step_result["status"] == StepStatus.FAILED: self._log( f"Step {i + 1} failed: {step_result.get('error', 'Unknown error')}" ) # Attempt rollback if enabled if self.safety_config["rollback_on_failure"]: self._rollback_to_checkpoint() self.current_execution["status"] = "failed" break else: # All steps completed successfully self.current_execution["status"] = "completed" self._log("Execution completed successfully") except Exception as e: self._log(f"Execution error: {str(e)}") self.current_execution["status"] = "error" def _execute_step(self, step: Dict, step_num: int, total_steps: int) -> Dict: """Execute a single step""" step_result = { "step": step, "step_number": step_num, "status": StepStatus.PENDING, "start_time": datetime.now(), "duration": 0, "output": None, "error": None, } try: # Notify step start self._trigger_step_callback("on_step_start", step_num, total_steps, step) # Validate step if enabled if self.safety_config["validate_before_execute"]: validation = self._validate_step(step) if not validation["valid"]: raise ValueError(f"Step validation failed: {validation['reason']}") # Update status step_result["status"] = StepStatus.RUNNING # Get tool from registry tool = self._get_tool(step["tool_id"]) if not tool: raise ValueError(f"Tool not found: {step['tool_id']}") # Prepare parameters parameters = self._prepare_parameters(step, tool) # Execute tool with timeout self._log(f"Executing {step['tool']} with parameters: {parameters}") # Record state before execution for potential rollback before_state = self._capture_state() self.undo_stack.append( { "step": step_num, "before_state": before_state, "tool": tool, "parameters": parameters, } ) # Execute the tool if hasattr(tool, "execute"): result = self._execute_with_timeout( lambda: tool.execute(**parameters), timeout=self.safety_config["max_execution_time"], ) step_result["output"] = result else: # Fallback execution method result = self._execute_tool_fallback(tool, parameters) step_result["output"] = result # Update status step_result["status"] = StepStatus.COMPLETED step_result["duration"] = ( datetime.now() - step_result["start_time"] ).total_seconds() # Notify completion self._trigger_step_callback( "on_step_complete", step_num, total_steps, step_result ) # Update progress progress = step_num / total_steps self._trigger_step_callback("on_progress", progress, step_num, total_steps) except Exception as e: step_result["status"] = StepStatus.FAILED step_result["error"] = str(e) step_result["duration"] = ( datetime.now() - step_result["start_time"] ).total_seconds() # Notify failure self._trigger_step_callback( "on_step_failed", step_num, total_steps, step_result ) self._log(f"Step execution failed: {str(e)}") return step_result def _validate_step(self, step: Dict) -> Dict[str, Any]: """Validate a step before execution""" validation = {"valid": True, "reason": ""} # Check if tool exists if "tool_id" not in step: validation["valid"] = False validation["reason"] = "No tool_id specified" return validation tool = self._get_tool(step["tool_id"]) if not tool: validation["valid"] = False validation["reason"] = f"Tool not found: {step['tool_id']}" return validation # Validate parameters if hasattr(tool, "validate_parameters"): param_validation = tool.validate_parameters(step.get("parameters", {})) if not param_validation.get("valid", True): validation["valid"] = False validation["reason"] = param_validation.get( "reason", "Parameter validation failed" ) # Check dependencies if "dependencies" in step and step["dependencies"]: for dep_step in step["dependencies"]: # Check if dependency was completed successfully dep_completed = any( r["step_number"] == dep_step and r["status"] == StepStatus.COMPLETED for r in self.step_results ) if not dep_completed: validation["valid"] = False validation["reason"] = f"Dependency step {dep_step} not completed" break return validation def _get_tool(self, tool_id: str): """Get tool from registry""" if self.agent_manager and self.agent_manager.tool_registry: return self.agent_manager.tool_registry.get_tool(tool_id) return None def _prepare_parameters(self, step: Dict, tool) -> Dict[str, Any]: """Prepare parameters for tool execution""" parameters = step.get("parameters", {}).copy() # Apply default parameters if not specified if hasattr(tool, "get_default_parameters"): defaults = tool.get_default_parameters() for key, value in defaults.items(): if key not in parameters: parameters[key] = value # Convert parameter types if needed if hasattr(tool, "convert_parameters"): parameters = tool.convert_parameters(parameters) return parameters def _execute_tool_fallback(self, tool, parameters: Dict) -> Any: """Fallback execution method for tools without execute method""" # Try different execution methods if hasattr(tool, "run"): return tool.run(**parameters) elif hasattr(tool, "apply"): return tool.apply(**parameters) elif hasattr(tool, "__call__"): return tool(**parameters) else: raise ValueError(f"Tool {tool} has no known execution method") def _execute_with_timeout(self, func: Callable, timeout: float) -> Any: """Execute function with timeout""" result = [None] exception = [None] def target(): try: result[0] = func() except Exception as e: exception[0] = e thread = threading.Thread(target=target) thread.start() thread.join(timeout) if thread.is_alive(): # Timeout occurred raise TimeoutError(f"Execution timed out after {timeout} seconds") if exception[0]: raise exception[0] return result[0] def _create_checkpoint(self): """Create a checkpoint for potential rollback""" try: # Save current document state doc = FreeCAD.ActiveDocument if doc: checkpoint_name = ( f"checkpoint_{datetime.now().strftime('%Y%m%d_%H%M%S')}" ) # Store in memory for now, could save to temp file self._checkpoint_data = { "name": checkpoint_name, "timestamp": datetime.now(), "document_state": self._capture_state(), } self._log(f"Created checkpoint: {checkpoint_name}") except Exception as e: self._log(f"Failed to create checkpoint: {str(e)}") def _rollback_to_checkpoint(self): """Rollback to the last checkpoint""" try: if hasattr(self, "_checkpoint_data") and self._checkpoint_data: # In a real implementation, this would restore the document state self._log( f"Rolling back to checkpoint: {self._checkpoint_data['name']}" ) # Use undo stack for rollback while self.undo_stack: undo_info = self.undo_stack.pop() # Perform undo operation if FreeCADGui: FreeCAD.ActiveDocument.undo() except Exception as e: self._log(f"Rollback failed: {str(e)}") def _capture_state(self) -> Dict: """Capture current FreeCAD document state""" state = {"timestamp": datetime.now().isoformat(), "objects": []} try: doc = FreeCAD.ActiveDocument if doc: for obj in doc.Objects: state["objects"].append( {"name": obj.Name, "type": obj.TypeId, "label": obj.Label} ) except Exception as e: self._log(f"Failed to capture state: {str(e)}") return state def _auto_save_document(self): """Auto-save the document before execution""" try: doc = FreeCAD.ActiveDocument if doc and doc.FileName: doc.save() self._log("Document auto-saved") except Exception as e: self._log(f"Auto-save failed: {str(e)}") def pause(self): """Pause execution""" self.is_paused = True self._log("Execution paused") def resume(self): """Resume execution""" self.is_paused = False self._log("Execution resumed") def stop(self): """Stop execution""" self.should_stop = True self._log("Execution stop requested") def register_step_callback(self, event: str, callback: Callable): """Register a callback for step events""" if event in self.step_callbacks: self.step_callbacks[event].append(callback) def _trigger_step_callback(self, event: str, *args, **kwargs): """Trigger callbacks for a step event""" if event in self.step_callbacks: for callback in self.step_callbacks[event]: try: callback(*args, **kwargs) except Exception as e: self._log(f"Callback error: {str(e)}") def _log(self, message: str): """Log execution message""" log_entry = {"timestamp": datetime.now().isoformat(), "message": message} self.execution_log.append(log_entry) FreeCAD.Console.PrintMessage(f"Execution Pipeline: {message}\n") def get_execution_log(self) -> List[Dict]: """Get execution log""" return self.execution_log.copy() def update_safety_config(self, config: Dict): """Update safety configuration""" self.safety_config.update(config) self._log(f"Safety config updated: {config}")

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/jango-blockchained/mcp-freecad'

If you have feedback or need assistance with the MCP directory API, please join our Discord server