/**
* Advanced Workflow Orchestration Engine
* Handles complex multi-step automation workflows with conditional execution,
* parallel processing, and error handling
*/
import { EventEmitter } from 'events';
import { v4 as uuidv4 } from 'uuid';
import PQueue from 'p-queue';
import {
WorkflowDefinition,
WorkflowExecution,
WorkflowStatus,
TaskExecution,
TaskStatus,
ExecutionContext,
TaskCondition,
StateMachine,
State,
Transition,
ApprovalExecution,
ExecutionError,
ExecutionLog,
WorkflowTask,
ParallelExecution,
LoopConfiguration,
RetryPolicy,
} from '../types/workflow.js';
import { ConsoleManager } from './ConsoleManager.js';
import { Logger } from '../utils/logger.js';
export class WorkflowEngine extends EventEmitter {
private executions: Map<string, WorkflowExecution>;
private workflows: Map<string, WorkflowDefinition>;
private stateMachines: Map<string, StateMachine>;
private taskQueues: Map<string, PQueue>;
private consoleManager: ConsoleManager;
private logger: Logger;
private approvalCallbacks: Map<
string,
(approved: boolean, comments?: string) => void
>;
constructor(consoleManager: ConsoleManager) {
super();
this.executions = new Map();
this.workflows = new Map();
this.stateMachines = new Map();
this.taskQueues = new Map();
this.consoleManager = consoleManager;
this.logger = new Logger('WorkflowEngine');
this.approvalCallbacks = new Map();
}
/**
* Register a workflow definition
*/
registerWorkflow(definition: WorkflowDefinition): void {
this.workflows.set(definition.id, definition);
this.logger.info(
`Workflow registered: ${definition.name} (${definition.id})`
);
this.emit('workflow-registered', definition);
}
/**
* Execute a workflow with given context
*/
async executeWorkflow(
workflowId: string,
context: ExecutionContext,
variables?: Record<string, any>
): Promise<string> {
const workflow = this.workflows.get(workflowId);
if (!workflow) {
throw new Error(`Workflow not found: ${workflowId}`);
}
const executionId = uuidv4();
const execution: WorkflowExecution = {
id: executionId,
workflowId,
status: 'pending',
startTime: new Date(),
triggeredBy: {
type: 'manual',
source: 'api',
timestamp: new Date(),
},
context,
tasks: [],
approvals: [],
variables: {
...workflow.variables.reduce(
(acc, v) => ({ ...acc, [v.name]: v.defaultValue }),
{}
),
...variables,
},
artifacts: [],
metrics: {
totalTasks: workflow.tasks.length,
completedTasks: 0,
failedTasks: 0,
skippedTasks: 0,
avgTaskDuration: 0,
resourceUsage: {
peakMemory: 0,
avgCpu: 0,
diskUsed: 0,
networkTraffic: 0,
},
performance: {
queueTime: 0,
executionTime: 0,
waitTime: 0,
overhead: 0,
},
},
logs: [],
errors: [],
};
this.executions.set(executionId, execution);
this.createTaskQueue(executionId);
try {
await this.startExecution(execution, workflow);
return executionId;
} catch (error: any) {
await this.handleExecutionError(execution, error);
throw error;
}
}
/**
* Start workflow execution
*/
private async startExecution(
execution: WorkflowExecution,
workflow: WorkflowDefinition
): Promise<void> {
this.updateExecutionStatus(execution, 'running');
this.addExecutionLog(
execution,
'info',
'workflow',
`Starting workflow: ${workflow.name}`
);
try {
// Initialize workflow variables
await this.initializeVariables(execution, workflow);
// Check if workflow uses state machine
if (workflow.metadata.dependencies?.includes('state-machine')) {
await this.executeStateMachineWorkflow(execution, workflow);
} else {
await this.executeSequentialWorkflow(execution, workflow);
}
} catch (error: any) {
await this.handleExecutionError(execution, error);
throw error;
}
}
/**
* Execute workflow using state machine approach
*/
private async executeStateMachineWorkflow(
execution: WorkflowExecution,
workflow: WorkflowDefinition
): Promise<void> {
const stateMachine = this.createStateMachine(workflow);
this.stateMachines.set(execution.id, stateMachine);
let currentState = stateMachine.states.find((s) => s.type === 'initial');
if (!currentState) {
throw new Error('No initial state found in state machine');
}
while (currentState && currentState.type !== 'final') {
this.addExecutionLog(
execution,
'info',
'state-machine',
`Entering state: ${currentState.name}`
);
// Execute tasks in current state
if (currentState.tasks) {
await this.executeStateTasks(execution, workflow, currentState);
}
// Find next transition
const transition = await this.findValidTransition(
execution,
stateMachine,
currentState.id
);
if (!transition) {
if (currentState.type === 'error') {
throw new Error(
`Workflow stuck in error state: ${currentState.name}`
);
}
break;
}
// Execute transition actions
if (transition.actions) {
await this.executeTransitionActions(execution, transition.actions);
}
currentState = stateMachine.states.find((s) => s.id === transition.to);
}
this.updateExecutionStatus(execution, 'completed');
}
/**
* Execute workflow sequentially with dependency resolution
*/
private async executeSequentialWorkflow(
execution: WorkflowExecution,
workflow: WorkflowDefinition
): Promise<void> {
const taskGraph = this.buildTaskDependencyGraph(workflow.tasks);
const readyTasks = this.getReadyTasks(taskGraph, new Set());
const completedTasks = new Set<string>();
while (readyTasks.length > 0 || this.hasPendingTasks(execution)) {
// Execute ready tasks
const taskPromises = readyTasks.map((task) =>
this.executeTask(execution, workflow, task)
);
const results = await Promise.allSettled(taskPromises);
// Process results
for (let i = 0; i < results.length; i++) {
const task = readyTasks[i];
const result = results[i];
if (result.status === 'fulfilled') {
completedTasks.add(task.id);
execution.metrics.completedTasks++;
} else {
execution.metrics.failedTasks++;
await this.handleTaskError(execution, task, result.reason);
if (workflow.errorHandling.global.onError === 'fail') {
throw result.reason;
}
}
}
// Find new ready tasks
readyTasks.length = 0;
readyTasks.push(...this.getReadyTasks(taskGraph, completedTasks));
// Check for deadlock
if (
readyTasks.length === 0 &&
!this.allTasksCompleted(taskGraph, completedTasks)
) {
throw new Error(
'Workflow deadlock detected - circular dependencies or missing tasks'
);
}
}
this.updateExecutionStatus(execution, 'completed');
}
/**
* Execute a single task with all its features
*/
private async executeTask(
execution: WorkflowExecution,
workflow: WorkflowDefinition,
task: WorkflowTask
): Promise<void> {
const taskExecution: TaskExecution = {
id: uuidv4(),
taskId: task.id,
status: 'pending',
attempts: 0,
input: task.input,
metrics: {
duration: 0,
memoryUsage: 0,
cpuUsage: 0,
diskIO: 0,
networkIO: 0,
outputSize: 0,
},
logs: [],
};
execution.tasks.push(taskExecution);
this.emit('task-started', execution.id, taskExecution);
try {
// Check task condition
if (
task.condition &&
!(await this.evaluateCondition(execution, task.condition))
) {
taskExecution.status = 'skipped';
execution.metrics.skippedTasks++;
this.addExecutionLog(
execution,
'info',
'task',
`Task skipped: ${task.name} (condition not met)`
);
return;
}
// Handle approval requirement
if (task.approval) {
await this.handleTaskApproval(execution, task);
}
// Execute based on task type and execution mode
taskExecution.status = 'running';
taskExecution.startTime = new Date();
if (task.executionMode === 'parallel' && task.parallel) {
await this.executeParallelTask(
execution,
workflow,
task,
taskExecution
);
} else if (task.loop) {
await this.executeLoopTask(execution, workflow, task, taskExecution);
} else {
await this.executeSingleTask(execution, workflow, task, taskExecution);
}
taskExecution.endTime = new Date();
taskExecution.duration =
taskExecution.endTime.getTime() - taskExecution.startTime!.getTime();
taskExecution.status = 'completed';
// Execute success actions
if (task.onSuccess) {
await this.executeTaskActions(execution, task.onSuccess);
}
} catch (error: any) {
taskExecution.status = 'failed';
taskExecution.error = {
timestamp: new Date(),
source: task.id,
type: error.constructor.name,
message: error.message,
stack: error.stack,
context: { taskId: task.id, input: task.input },
recoverable: this.isRecoverableError(error),
};
// Handle retry logic
const retryPolicy =
task.retryPolicy || workflow.errorHandling.global.retryPolicy;
if (retryPolicy && taskExecution.attempts < retryPolicy.maxAttempts) {
await this.retryTask(
execution,
workflow,
task,
taskExecution,
retryPolicy
);
return;
}
// Execute failure actions
if (task.onFailure) {
await this.executeTaskActions(execution, task.onFailure);
}
throw error;
} finally {
this.emit('task-completed', execution.id, taskExecution);
}
}
/**
* Execute parallel tasks with concurrency control
*/
private async executeParallelTask(
execution: WorkflowExecution,
workflow: WorkflowDefinition,
task: WorkflowTask,
taskExecution: TaskExecution
): Promise<void> {
if (!task.parallel) return;
const queue = new PQueue({
concurrency: task.parallel.maxConcurrency,
});
const subtasks = this.generateParallelSubtasks(task);
const results: any[] = [];
try {
const promises = subtasks.map((subtask, index) =>
queue.add(async () => {
try {
const result = await this.executeSingleTask(
execution,
workflow,
subtask,
taskExecution
);
results[index] = result;
return result;
} catch (error) {
if (task.parallel!.failFast) {
queue.clear();
throw error;
}
results[index] = { error: error.message };
return null;
}
})
);
await Promise.all(promises);
// Aggregate results based on strategy
taskExecution.output = this.aggregateParallelResults(
results,
task.parallel.aggregationStrategy
);
} finally {
queue.clear();
}
}
/**
* Execute task with loop configuration
*/
private async executeLoopTask(
execution: WorkflowExecution,
workflow: WorkflowDefinition,
task: WorkflowTask,
taskExecution: TaskExecution
): Promise<void> {
if (!task.loop) return;
const results: any[] = [];
let iteration = 0;
while (iteration < task.loop.maxIterations) {
// Check loop condition
if (
task.loop.condition &&
!(await this.evaluateCondition(execution, task.loop.condition))
) {
break;
}
// Check break condition
if (
task.loop.breakCondition &&
(await this.evaluateCondition(execution, task.loop.breakCondition))
) {
break;
}
// Set loop variables
if (task.loop.type === 'foreach' && task.loop.items) {
const items = execution.variables[task.loop.items] as any[];
if (!items || iteration >= items.length) break;
execution.variables['loop_item'] = items[iteration];
}
execution.variables['loop_index'] = iteration;
// Execute task iteration
try {
const result = await this.executeSingleTask(
execution,
workflow,
task,
taskExecution
);
results.push(result);
} catch (error: any) {
if (workflow.errorHandling.global.onError === 'fail') {
throw error;
}
results.push({ error: error.message, iteration });
}
iteration++;
// For 'for' loop, check iterations
if (
task.loop.type === 'for' &&
task.loop.iterations &&
iteration >= task.loop.iterations
) {
break;
}
}
taskExecution.output = { iterations: iteration, results };
}
/**
* Execute a single task based on its type
*/
private async executeSingleTask(
execution: WorkflowExecution,
workflow: WorkflowDefinition,
task: WorkflowTask,
taskExecution: TaskExecution
): Promise<any> {
const startTime = Date.now();
try {
let result: any;
switch (task.type) {
case 'command':
result = await this.executeCommandTask(
execution,
task,
taskExecution
);
break;
case 'script':
result = await this.executeScriptTask(execution, task, taskExecution);
break;
case 'api_call':
result = await this.executeApiCallTask(
execution,
task,
taskExecution
);
break;
case 'file_operation':
result = await this.executeFileOperationTask(
execution,
task,
taskExecution
);
break;
case 'condition':
result = await this.executeConditionTask(
execution,
task,
taskExecution
);
break;
case 'wait':
result = await this.executeWaitTask(execution, task, taskExecution);
break;
case 'notification':
result = await this.executeNotificationTask(
execution,
task,
taskExecution
);
break;
case 'subworkflow':
result = await this.executeSubworkflowTask(
execution,
task,
taskExecution
);
break;
default:
throw new Error(`Unsupported task type: ${task.type}`);
}
// Update task output
if (task.output && result) {
if (task.output.variables) {
Object.entries(task.output.variables).forEach(([key, expression]) => {
execution.variables[key] = this.evaluateExpression(
expression,
result,
execution.variables
);
});
}
}
taskExecution.metrics.duration = Date.now() - startTime;
return result;
} catch (error: any) {
taskExecution.metrics.duration = Date.now() - startTime;
this.addExecutionLog(
execution,
'error',
'task',
`Task failed: ${task.name} - ${error.message}`
);
throw error;
}
}
/**
* Execute command task using ConsoleManager
*/
private async executeCommandTask(
execution: WorkflowExecution,
task: WorkflowTask,
taskExecution: TaskExecution
): Promise<any> {
if (!task.input.command) {
throw new Error('Command task requires command in input');
}
const command = this.interpolateVariables(
task.input.command,
execution.variables
);
const args = task.input.args?.map((arg) =>
this.interpolateVariables(arg, execution.variables)
);
this.addExecutionLog(
execution,
'info',
'task',
`Executing command: ${command} ${args?.join(' ') || ''}`
);
try {
const result = await this.consoleManager.executeCommand(command, args, {
cwd: task.input.variables?.cwd as string,
env: task.input.variables?.env as Record<string, string>,
timeout: task.timeout,
detectErrors: true,
});
taskExecution.output = result;
taskExecution.exitCode = result.exitCode;
return result;
} catch (error: any) {
throw new Error(`Command execution failed: ${error.message}`);
}
}
/**
* Execute API call task
*/
private async executeApiCallTask(
execution: WorkflowExecution,
task: WorkflowTask,
taskExecution: TaskExecution
): Promise<any> {
if (!task.input.url) {
throw new Error('API call task requires URL in input');
}
const url = this.interpolateVariables(task.input.url, execution.variables);
const method = task.input.method || 'GET';
const headers = task.input.headers || {};
const body = task.input.body;
this.addExecutionLog(
execution,
'info',
'task',
`Making API call: ${method} ${url}`
);
try {
const response = await fetch(url, {
method,
headers,
body: body ? JSON.stringify(body) : undefined,
});
if (!response.ok) {
throw new Error(`HTTP ${response.status}: ${response.statusText}`);
}
const result = await response.json();
taskExecution.output = result;
return result;
} catch (error: any) {
throw new Error(`API call failed: ${error.message}`);
}
}
/**
* Execute condition task
*/
private async executeConditionTask(
execution: WorkflowExecution,
task: WorkflowTask,
taskExecution: TaskExecution
): Promise<any> {
if (!task.condition) {
throw new Error('Condition task requires condition configuration');
}
const result = await this.evaluateCondition(execution, task.condition);
taskExecution.output = { result };
this.addExecutionLog(
execution,
'info',
'task',
`Condition evaluated to: ${result}`
);
return { result };
}
/**
* Execute wait task
*/
private async executeWaitTask(
execution: WorkflowExecution,
task: WorkflowTask,
taskExecution: TaskExecution
): Promise<any> {
const duration = (task.input.variables?.duration as number) || 1000;
this.addExecutionLog(
execution,
'info',
'task',
`Waiting for ${duration}ms`
);
await new Promise((resolve) => setTimeout(resolve, duration));
return { waited: duration };
}
/**
* Execute notification task
*/
private async executeNotificationTask(
execution: WorkflowExecution,
task: WorkflowTask,
taskExecution: TaskExecution
): Promise<any> {
const message = this.interpolateVariables(
(task.input.variables?.message as string) || 'Workflow notification',
execution.variables
);
this.addExecutionLog(
execution,
'info',
'task',
`Sending notification: ${message}`
);
// Emit notification event
this.emit('notification', {
executionId: execution.id,
taskId: task.id,
message,
timestamp: new Date(),
});
return { sent: true, message };
}
/**
* Execute subworkflow task
*/
private async executeSubworkflowTask(
execution: WorkflowExecution,
task: WorkflowTask,
taskExecution: TaskExecution
): Promise<any> {
const subworkflowId = task.input.variables?.workflowId as string;
if (!subworkflowId) {
throw new Error('Subworkflow task requires workflowId');
}
this.addExecutionLog(
execution,
'info',
'task',
`Executing subworkflow: ${subworkflowId}`
);
const subExecutionId = await this.executeWorkflow(
subworkflowId,
{ ...execution.context },
task.input.variables as Record<string, any>
);
// Wait for completion
const subExecution = await this.waitForExecution(subExecutionId);
return {
executionId: subExecutionId,
status: subExecution.status,
output: subExecution.variables,
};
}
// ... Additional helper methods
/**
* Evaluate task condition
*/
private async evaluateCondition(
execution: WorkflowExecution,
condition: TaskCondition
): Promise<boolean> {
switch (condition.type) {
case 'expression':
return this.evaluateExpression(
condition.expression!,
null,
execution.variables
) as boolean;
case 'variable':
const variable = execution.variables[condition.variables![0]];
return this.compareValues(
variable,
condition.operator!,
condition.value
);
case 'previous_task':
const taskResult = execution.tasks.find(
(t) => t.taskId === condition.variables![0]
);
return taskResult?.status === 'completed';
default:
return true;
}
}
private compareValues(actual: any, operator: string, expected: any): boolean {
switch (operator) {
case 'eq':
return actual === expected;
case 'ne':
return actual !== expected;
case 'gt':
return actual > expected;
case 'lt':
return actual < expected;
case 'gte':
return actual >= expected;
case 'lte':
return actual <= expected;
case 'contains':
return String(actual).includes(String(expected));
case 'matches':
return new RegExp(expected).test(String(actual));
case 'exists':
return actual !== undefined && actual !== null;
default:
return false;
}
}
/**
* Interpolate variables in string templates
*/
private interpolateVariables(
template: string,
variables: Record<string, any>
): string {
return template.replace(/\{\{(\w+)\}\}/g, (match, key) => {
return variables[key] ?? match;
});
}
/**
* Evaluate expression with context
*/
private evaluateExpression(
expression: string,
data: any,
variables: Record<string, any>
): any {
// Simple expression evaluation - in production, use a proper expression engine
try {
const context = { ...variables, data };
const func = new Function(
...Object.keys(context),
`return ${expression}`
);
return func(...Object.values(context));
} catch {
return false;
}
}
/**
* Update execution status and emit events
*/
private updateExecutionStatus(
execution: WorkflowExecution,
status: WorkflowStatus
): void {
execution.status = status;
if (
status === 'completed' ||
status === 'failed' ||
status === 'cancelled'
) {
execution.endTime = new Date();
execution.duration =
execution.endTime.getTime() - execution.startTime.getTime();
}
this.emit('execution-status-changed', execution.id, status);
}
/**
* Add execution log entry
*/
private addExecutionLog(
execution: WorkflowExecution,
level: 'debug' | 'info' | 'warn' | 'error',
source: string,
message: string,
data?: any
): void {
const log: ExecutionLog = {
timestamp: new Date(),
level,
source,
message,
data,
};
execution.logs.push(log);
this.logger[level](`[${execution.id}] ${message}`, data);
}
/**
* Execute task actions (onSuccess, onError, etc.)
*/
private async executeTaskActions(
execution: WorkflowExecution,
actions: any[]
): Promise<void> {
for (const action of actions) {
this.addExecutionLog(
execution,
'info',
'task-action',
`Executing action: ${action.type || 'custom'}`
);
// Implementation would depend on action types defined in workflow
}
}
// ... Additional utility methods for task queues, dependency graphs, approvals, etc.
/**
* Get execution by ID
*/
getExecution(executionId: string): WorkflowExecution | undefined {
return this.executions.get(executionId);
}
/**
* Cancel workflow execution
*/
async cancelExecution(executionId: string): Promise<void> {
const execution = this.executions.get(executionId);
if (!execution) {
throw new Error(`Execution not found: ${executionId}`);
}
this.updateExecutionStatus(execution, 'cancelled');
this.addExecutionLog(
execution,
'info',
'workflow',
'Workflow cancelled by user'
);
// Cancel running tasks
const runningTasks = execution.tasks.filter((t) => t.status === 'running');
for (const task of runningTasks) {
if (task.sessionId) {
await this.consoleManager.stopSession(task.sessionId);
}
}
this.emit('execution-cancelled', executionId);
}
/**
* Get all executions
*/
getAllExecutions(): WorkflowExecution[] {
return Array.from(this.executions.values());
}
/**
* Clean up completed executions
*/
cleanup(olderThanHours: number = 24): void {
const cutoff = Date.now() - olderThanHours * 60 * 60 * 1000;
Array.from(this.executions.entries()).forEach(([id, execution]) => {
if (execution.endTime && execution.endTime.getTime() < cutoff) {
this.executions.delete(id);
this.taskQueues.delete(id);
this.stateMachines.delete(id);
}
});
}
// Additional private helper methods would go here...
private createTaskQueue(executionId: string): void {
this.taskQueues.set(executionId, new PQueue({ concurrency: 5 }));
}
private async initializeVariables(
execution: WorkflowExecution,
workflow: WorkflowDefinition
): Promise<void> {
// Initialize workflow variables from definition
for (const variable of workflow.variables) {
if (!(variable.name in execution.variables)) {
execution.variables[variable.name] = variable.defaultValue;
}
}
}
private buildTaskDependencyGraph(
tasks: WorkflowTask[]
): Map<string, WorkflowTask> {
return new Map(tasks.map((task) => [task.id, task]));
}
private getReadyTasks(
taskGraph: Map<string, WorkflowTask>,
completed: Set<string>
): WorkflowTask[] {
const ready: WorkflowTask[] = [];
Array.from(taskGraph.values()).forEach((task) => {
const allDependenciesCompleted = task.dependsOn.every((dep) =>
completed.has(dep)
);
if (allDependenciesCompleted && !completed.has(task.id)) {
ready.push(task);
}
});
return ready;
}
private hasPendingTasks(execution: WorkflowExecution): boolean {
return execution.tasks.some(
(t) => t.status === 'running' || t.status === 'pending'
);
}
private allTasksCompleted(
taskGraph: Map<string, WorkflowTask>,
completed: Set<string>
): boolean {
return taskGraph.size === completed.size;
}
private createStateMachine(workflow: WorkflowDefinition): StateMachine {
// Create a basic state machine from workflow tasks
// In practice, this would be more sophisticated
return {
id: workflow.id,
name: workflow.name,
initialState: 'start',
states: [
{ id: 'start', name: 'Start', type: 'initial', tasks: [] },
{ id: 'end', name: 'End', type: 'final', tasks: [] },
],
transitions: [
{ id: 'start-to-end', from: 'start', to: 'end', actions: [] },
],
context: {},
};
}
private async executeStateTasks(
execution: WorkflowExecution,
workflow: WorkflowDefinition,
state: State
): Promise<void> {
if (!state.tasks) return;
for (const taskId of state.tasks) {
const task = workflow.tasks.find((t) => t.id === taskId);
if (task) {
await this.executeTask(execution, workflow, task);
}
}
}
private async findValidTransition(
execution: WorkflowExecution,
stateMachine: StateMachine,
currentStateId: string
): Promise<Transition | undefined> {
const transitions = stateMachine.transitions.filter(
(t) => t.from === currentStateId
);
for (const transition of transitions) {
if (
!transition.condition ||
(await this.evaluateCondition(execution, transition.condition))
) {
return transition;
}
}
return undefined;
}
private async executeTransitionActions(
execution: WorkflowExecution,
actions: any[]
): Promise<void> {
// Execute state transition actions
for (const action of actions) {
// Implementation depends on action types
this.addExecutionLog(
execution,
'info',
'state-machine',
`Executing transition action: ${action.type}`
);
}
}
private async handleTaskError(
execution: WorkflowExecution,
task: WorkflowTask,
error: any
): Promise<void> {
const executionError: ExecutionError = {
timestamp: new Date(),
source: task.id,
type: error.constructor.name,
message: error.message,
stack: error.stack,
context: { taskId: task.id },
recoverable: this.isRecoverableError(error),
};
execution.errors.push(executionError);
this.addExecutionLog(
execution,
'error',
'task',
`Task error: ${task.name} - ${error.message}`
);
}
private async handleExecutionError(
execution: WorkflowExecution,
error: any
): Promise<void> {
this.updateExecutionStatus(execution, 'failed');
this.addExecutionLog(
execution,
'error',
'workflow',
`Workflow failed: ${error.message}`
);
}
private isRecoverableError(error: any): boolean {
// Implement logic to determine if error is recoverable
return error.code !== 'FATAL';
}
private generateParallelSubtasks(task: WorkflowTask): WorkflowTask[] {
// Generate subtasks for parallel execution
// This is a simplified implementation
return [task]; // In practice, split task into parallel subtasks
}
private aggregateParallelResults(results: any[], strategy: string): any {
switch (strategy) {
case 'array':
return results;
case 'first':
return results[0];
case 'last':
return results[results.length - 1];
case 'merge':
return Object.assign({}, ...results);
default:
return results;
}
}
private async retryTask(
execution: WorkflowExecution,
workflow: WorkflowDefinition,
task: WorkflowTask,
taskExecution: TaskExecution,
retryPolicy: RetryPolicy
): Promise<void> {
taskExecution.attempts++;
const delay = this.calculateRetryDelay(retryPolicy, taskExecution.attempts);
this.addExecutionLog(
execution,
'info',
'task',
`Retrying task ${task.name} (attempt ${taskExecution.attempts}/${retryPolicy.maxAttempts}) after ${delay}ms`
);
await new Promise((resolve) => setTimeout(resolve, delay));
// Reset task status and retry
taskExecution.status = 'running';
await this.executeSingleTask(execution, workflow, task, taskExecution);
}
private calculateRetryDelay(policy: RetryPolicy, attempt: number): number {
switch (policy.backoff) {
case 'exponential':
return Math.min(
policy.initialDelay * Math.pow(policy.multiplier || 2, attempt - 1),
policy.maxDelay || 30000
);
case 'linear':
return Math.min(
policy.initialDelay * attempt,
policy.maxDelay || 30000
);
default:
return policy.initialDelay;
}
}
private async handleTaskApproval(
execution: WorkflowExecution,
task: WorkflowTask
): Promise<void> {
if (!task.approval) return;
const approvalExecution: ApprovalExecution = {
id: uuidv4(),
configId: task.approval.configId,
status: 'pending',
requestTime: new Date(),
context: task.approval.context || {},
};
execution.approvals.push(approvalExecution);
this.addExecutionLog(
execution,
'info',
'approval',
`Approval required for task: ${task.name}`
);
this.emit('approval-required', execution.id, approvalExecution);
// Wait for approval
return new Promise((resolve, reject) => {
this.approvalCallbacks.set(
approvalExecution.id,
(approved: boolean, comments?: string) => {
approvalExecution.status = approved ? 'approved' : 'rejected';
approvalExecution.responseTime = new Date();
approvalExecution.comments = comments;
if (approved) {
resolve();
} else {
reject(
new Error(
`Task approval rejected: ${comments || 'No comments provided'}`
)
);
}
}
);
});
}
private async executeFileOperationTask(
execution: WorkflowExecution,
task: WorkflowTask,
taskExecution: TaskExecution
): Promise<any> {
// Implement file operations
throw new Error('File operation task not implemented yet');
}
private async executeScriptTask(
execution: WorkflowExecution,
task: WorkflowTask,
taskExecution: TaskExecution
): Promise<any> {
// Implement script execution
throw new Error('Script task not implemented yet');
}
private async waitForExecution(
executionId: string
): Promise<WorkflowExecution> {
return new Promise((resolve, reject) => {
const checkStatus = () => {
const execution = this.executions.get(executionId);
if (!execution) {
reject(new Error(`Execution not found: ${executionId}`));
return;
}
if (
execution.status === 'completed' ||
execution.status === 'failed' ||
execution.status === 'cancelled'
) {
resolve(execution);
} else {
setTimeout(checkStatus, 1000);
}
};
checkStatus();
});
}
/**
* Approve a pending approval
*/
approveTask(approvalId: string, approved: boolean, comments?: string): void {
const callback = this.approvalCallbacks.get(approvalId);
if (callback) {
callback(approved, comments);
this.approvalCallbacks.delete(approvalId);
}
}
}