Skip to main content
Glama
ooples

MCP Console Automation Server

WorkflowEngine.ts35.1 kB
/** * Advanced Workflow Orchestration Engine * Handles complex multi-step automation workflows with conditional execution, * parallel processing, and error handling */ import { EventEmitter } from 'events'; import { v4 as uuidv4 } from 'uuid'; import PQueue from 'p-queue'; import { WorkflowDefinition, WorkflowExecution, WorkflowStatus, TaskExecution, TaskStatus, ExecutionContext, TaskCondition, StateMachine, State, Transition, ApprovalExecution, ExecutionError, ExecutionLog, WorkflowTask, ParallelExecution, LoopConfiguration, RetryPolicy, } from '../types/workflow.js'; import { ConsoleManager } from './ConsoleManager.js'; import { Logger } from '../utils/logger.js'; export class WorkflowEngine extends EventEmitter { private executions: Map<string, WorkflowExecution>; private workflows: Map<string, WorkflowDefinition>; private stateMachines: Map<string, StateMachine>; private taskQueues: Map<string, PQueue>; private consoleManager: ConsoleManager; private logger: Logger; private approvalCallbacks: Map< string, (approved: boolean, comments?: string) => void >; constructor(consoleManager: ConsoleManager) { super(); this.executions = new Map(); this.workflows = new Map(); this.stateMachines = new Map(); this.taskQueues = new Map(); this.consoleManager = consoleManager; this.logger = new Logger('WorkflowEngine'); this.approvalCallbacks = new Map(); } /** * Register a workflow definition */ registerWorkflow(definition: WorkflowDefinition): void { this.workflows.set(definition.id, definition); this.logger.info( `Workflow registered: ${definition.name} (${definition.id})` ); this.emit('workflow-registered', definition); } /** * Execute a workflow with given context */ async executeWorkflow( workflowId: string, context: ExecutionContext, variables?: Record<string, any> ): Promise<string> { const workflow = this.workflows.get(workflowId); if (!workflow) { throw new Error(`Workflow not found: ${workflowId}`); } const executionId = uuidv4(); const execution: WorkflowExecution = { id: executionId, workflowId, status: 'pending', startTime: new Date(), triggeredBy: { type: 'manual', source: 'api', timestamp: new Date(), }, context, tasks: [], approvals: [], variables: { ...workflow.variables.reduce( (acc, v) => ({ ...acc, [v.name]: v.defaultValue }), {} ), ...variables, }, artifacts: [], metrics: { totalTasks: workflow.tasks.length, completedTasks: 0, failedTasks: 0, skippedTasks: 0, avgTaskDuration: 0, resourceUsage: { peakMemory: 0, avgCpu: 0, diskUsed: 0, networkTraffic: 0, }, performance: { queueTime: 0, executionTime: 0, waitTime: 0, overhead: 0, }, }, logs: [], errors: [], }; this.executions.set(executionId, execution); this.createTaskQueue(executionId); try { await this.startExecution(execution, workflow); return executionId; } catch (error: any) { await this.handleExecutionError(execution, error); throw error; } } /** * Start workflow execution */ private async startExecution( execution: WorkflowExecution, workflow: WorkflowDefinition ): Promise<void> { this.updateExecutionStatus(execution, 'running'); this.addExecutionLog( execution, 'info', 'workflow', `Starting workflow: ${workflow.name}` ); try { // Initialize workflow variables await this.initializeVariables(execution, workflow); // Check if workflow uses state machine if (workflow.metadata.dependencies?.includes('state-machine')) { await this.executeStateMachineWorkflow(execution, workflow); } else { await this.executeSequentialWorkflow(execution, workflow); } } catch (error: any) { await this.handleExecutionError(execution, error); throw error; } } /** * Execute workflow using state machine approach */ private async executeStateMachineWorkflow( execution: WorkflowExecution, workflow: WorkflowDefinition ): Promise<void> { const stateMachine = this.createStateMachine(workflow); this.stateMachines.set(execution.id, stateMachine); let currentState = stateMachine.states.find((s) => s.type === 'initial'); if (!currentState) { throw new Error('No initial state found in state machine'); } while (currentState && currentState.type !== 'final') { this.addExecutionLog( execution, 'info', 'state-machine', `Entering state: ${currentState.name}` ); // Execute tasks in current state if (currentState.tasks) { await this.executeStateTasks(execution, workflow, currentState); } // Find next transition const transition = await this.findValidTransition( execution, stateMachine, currentState.id ); if (!transition) { if (currentState.type === 'error') { throw new Error( `Workflow stuck in error state: ${currentState.name}` ); } break; } // Execute transition actions if (transition.actions) { await this.executeTransitionActions(execution, transition.actions); } currentState = stateMachine.states.find((s) => s.id === transition.to); } this.updateExecutionStatus(execution, 'completed'); } /** * Execute workflow sequentially with dependency resolution */ private async executeSequentialWorkflow( execution: WorkflowExecution, workflow: WorkflowDefinition ): Promise<void> { const taskGraph = this.buildTaskDependencyGraph(workflow.tasks); const readyTasks = this.getReadyTasks(taskGraph, new Set()); const completedTasks = new Set<string>(); while (readyTasks.length > 0 || this.hasPendingTasks(execution)) { // Execute ready tasks const taskPromises = readyTasks.map((task) => this.executeTask(execution, workflow, task) ); const results = await Promise.allSettled(taskPromises); // Process results for (let i = 0; i < results.length; i++) { const task = readyTasks[i]; const result = results[i]; if (result.status === 'fulfilled') { completedTasks.add(task.id); execution.metrics.completedTasks++; } else { execution.metrics.failedTasks++; await this.handleTaskError(execution, task, result.reason); if (workflow.errorHandling.global.onError === 'fail') { throw result.reason; } } } // Find new ready tasks readyTasks.length = 0; readyTasks.push(...this.getReadyTasks(taskGraph, completedTasks)); // Check for deadlock if ( readyTasks.length === 0 && !this.allTasksCompleted(taskGraph, completedTasks) ) { throw new Error( 'Workflow deadlock detected - circular dependencies or missing tasks' ); } } this.updateExecutionStatus(execution, 'completed'); } /** * Execute a single task with all its features */ private async executeTask( execution: WorkflowExecution, workflow: WorkflowDefinition, task: WorkflowTask ): Promise<void> { const taskExecution: TaskExecution = { id: uuidv4(), taskId: task.id, status: 'pending', attempts: 0, input: task.input, metrics: { duration: 0, memoryUsage: 0, cpuUsage: 0, diskIO: 0, networkIO: 0, outputSize: 0, }, logs: [], }; execution.tasks.push(taskExecution); this.emit('task-started', execution.id, taskExecution); try { // Check task condition if ( task.condition && !(await this.evaluateCondition(execution, task.condition)) ) { taskExecution.status = 'skipped'; execution.metrics.skippedTasks++; this.addExecutionLog( execution, 'info', 'task', `Task skipped: ${task.name} (condition not met)` ); return; } // Handle approval requirement if (task.approval) { await this.handleTaskApproval(execution, task); } // Execute based on task type and execution mode taskExecution.status = 'running'; taskExecution.startTime = new Date(); if (task.executionMode === 'parallel' && task.parallel) { await this.executeParallelTask( execution, workflow, task, taskExecution ); } else if (task.loop) { await this.executeLoopTask(execution, workflow, task, taskExecution); } else { await this.executeSingleTask(execution, workflow, task, taskExecution); } taskExecution.endTime = new Date(); taskExecution.duration = taskExecution.endTime.getTime() - taskExecution.startTime!.getTime(); taskExecution.status = 'completed'; // Execute success actions if (task.onSuccess) { await this.executeTaskActions(execution, task.onSuccess); } } catch (error: any) { taskExecution.status = 'failed'; taskExecution.error = { timestamp: new Date(), source: task.id, type: error.constructor.name, message: error.message, stack: error.stack, context: { taskId: task.id, input: task.input }, recoverable: this.isRecoverableError(error), }; // Handle retry logic const retryPolicy = task.retryPolicy || workflow.errorHandling.global.retryPolicy; if (retryPolicy && taskExecution.attempts < retryPolicy.maxAttempts) { await this.retryTask( execution, workflow, task, taskExecution, retryPolicy ); return; } // Execute failure actions if (task.onFailure) { await this.executeTaskActions(execution, task.onFailure); } throw error; } finally { this.emit('task-completed', execution.id, taskExecution); } } /** * Execute parallel tasks with concurrency control */ private async executeParallelTask( execution: WorkflowExecution, workflow: WorkflowDefinition, task: WorkflowTask, taskExecution: TaskExecution ): Promise<void> { if (!task.parallel) return; const queue = new PQueue({ concurrency: task.parallel.maxConcurrency, }); const subtasks = this.generateParallelSubtasks(task); const results: any[] = []; try { const promises = subtasks.map((subtask, index) => queue.add(async () => { try { const result = await this.executeSingleTask( execution, workflow, subtask, taskExecution ); results[index] = result; return result; } catch (error) { if (task.parallel!.failFast) { queue.clear(); throw error; } results[index] = { error: error.message }; return null; } }) ); await Promise.all(promises); // Aggregate results based on strategy taskExecution.output = this.aggregateParallelResults( results, task.parallel.aggregationStrategy ); } finally { queue.clear(); } } /** * Execute task with loop configuration */ private async executeLoopTask( execution: WorkflowExecution, workflow: WorkflowDefinition, task: WorkflowTask, taskExecution: TaskExecution ): Promise<void> { if (!task.loop) return; const results: any[] = []; let iteration = 0; while (iteration < task.loop.maxIterations) { // Check loop condition if ( task.loop.condition && !(await this.evaluateCondition(execution, task.loop.condition)) ) { break; } // Check break condition if ( task.loop.breakCondition && (await this.evaluateCondition(execution, task.loop.breakCondition)) ) { break; } // Set loop variables if (task.loop.type === 'foreach' && task.loop.items) { const items = execution.variables[task.loop.items] as any[]; if (!items || iteration >= items.length) break; execution.variables['loop_item'] = items[iteration]; } execution.variables['loop_index'] = iteration; // Execute task iteration try { const result = await this.executeSingleTask( execution, workflow, task, taskExecution ); results.push(result); } catch (error: any) { if (workflow.errorHandling.global.onError === 'fail') { throw error; } results.push({ error: error.message, iteration }); } iteration++; // For 'for' loop, check iterations if ( task.loop.type === 'for' && task.loop.iterations && iteration >= task.loop.iterations ) { break; } } taskExecution.output = { iterations: iteration, results }; } /** * Execute a single task based on its type */ private async executeSingleTask( execution: WorkflowExecution, workflow: WorkflowDefinition, task: WorkflowTask, taskExecution: TaskExecution ): Promise<any> { const startTime = Date.now(); try { let result: any; switch (task.type) { case 'command': result = await this.executeCommandTask( execution, task, taskExecution ); break; case 'script': result = await this.executeScriptTask(execution, task, taskExecution); break; case 'api_call': result = await this.executeApiCallTask( execution, task, taskExecution ); break; case 'file_operation': result = await this.executeFileOperationTask( execution, task, taskExecution ); break; case 'condition': result = await this.executeConditionTask( execution, task, taskExecution ); break; case 'wait': result = await this.executeWaitTask(execution, task, taskExecution); break; case 'notification': result = await this.executeNotificationTask( execution, task, taskExecution ); break; case 'subworkflow': result = await this.executeSubworkflowTask( execution, task, taskExecution ); break; default: throw new Error(`Unsupported task type: ${task.type}`); } // Update task output if (task.output && result) { if (task.output.variables) { Object.entries(task.output.variables).forEach(([key, expression]) => { execution.variables[key] = this.evaluateExpression( expression, result, execution.variables ); }); } } taskExecution.metrics.duration = Date.now() - startTime; return result; } catch (error: any) { taskExecution.metrics.duration = Date.now() - startTime; this.addExecutionLog( execution, 'error', 'task', `Task failed: ${task.name} - ${error.message}` ); throw error; } } /** * Execute command task using ConsoleManager */ private async executeCommandTask( execution: WorkflowExecution, task: WorkflowTask, taskExecution: TaskExecution ): Promise<any> { if (!task.input.command) { throw new Error('Command task requires command in input'); } const command = this.interpolateVariables( task.input.command, execution.variables ); const args = task.input.args?.map((arg) => this.interpolateVariables(arg, execution.variables) ); this.addExecutionLog( execution, 'info', 'task', `Executing command: ${command} ${args?.join(' ') || ''}` ); try { const result = await this.consoleManager.executeCommand(command, args, { cwd: task.input.variables?.cwd as string, env: task.input.variables?.env as Record<string, string>, timeout: task.timeout, detectErrors: true, }); taskExecution.output = result; taskExecution.exitCode = result.exitCode; return result; } catch (error: any) { throw new Error(`Command execution failed: ${error.message}`); } } /** * Execute API call task */ private async executeApiCallTask( execution: WorkflowExecution, task: WorkflowTask, taskExecution: TaskExecution ): Promise<any> { if (!task.input.url) { throw new Error('API call task requires URL in input'); } const url = this.interpolateVariables(task.input.url, execution.variables); const method = task.input.method || 'GET'; const headers = task.input.headers || {}; const body = task.input.body; this.addExecutionLog( execution, 'info', 'task', `Making API call: ${method} ${url}` ); try { const response = await fetch(url, { method, headers, body: body ? JSON.stringify(body) : undefined, }); if (!response.ok) { throw new Error(`HTTP ${response.status}: ${response.statusText}`); } const result = await response.json(); taskExecution.output = result; return result; } catch (error: any) { throw new Error(`API call failed: ${error.message}`); } } /** * Execute condition task */ private async executeConditionTask( execution: WorkflowExecution, task: WorkflowTask, taskExecution: TaskExecution ): Promise<any> { if (!task.condition) { throw new Error('Condition task requires condition configuration'); } const result = await this.evaluateCondition(execution, task.condition); taskExecution.output = { result }; this.addExecutionLog( execution, 'info', 'task', `Condition evaluated to: ${result}` ); return { result }; } /** * Execute wait task */ private async executeWaitTask( execution: WorkflowExecution, task: WorkflowTask, taskExecution: TaskExecution ): Promise<any> { const duration = (task.input.variables?.duration as number) || 1000; this.addExecutionLog( execution, 'info', 'task', `Waiting for ${duration}ms` ); await new Promise((resolve) => setTimeout(resolve, duration)); return { waited: duration }; } /** * Execute notification task */ private async executeNotificationTask( execution: WorkflowExecution, task: WorkflowTask, taskExecution: TaskExecution ): Promise<any> { const message = this.interpolateVariables( (task.input.variables?.message as string) || 'Workflow notification', execution.variables ); this.addExecutionLog( execution, 'info', 'task', `Sending notification: ${message}` ); // Emit notification event this.emit('notification', { executionId: execution.id, taskId: task.id, message, timestamp: new Date(), }); return { sent: true, message }; } /** * Execute subworkflow task */ private async executeSubworkflowTask( execution: WorkflowExecution, task: WorkflowTask, taskExecution: TaskExecution ): Promise<any> { const subworkflowId = task.input.variables?.workflowId as string; if (!subworkflowId) { throw new Error('Subworkflow task requires workflowId'); } this.addExecutionLog( execution, 'info', 'task', `Executing subworkflow: ${subworkflowId}` ); const subExecutionId = await this.executeWorkflow( subworkflowId, { ...execution.context }, task.input.variables as Record<string, any> ); // Wait for completion const subExecution = await this.waitForExecution(subExecutionId); return { executionId: subExecutionId, status: subExecution.status, output: subExecution.variables, }; } // ... Additional helper methods /** * Evaluate task condition */ private async evaluateCondition( execution: WorkflowExecution, condition: TaskCondition ): Promise<boolean> { switch (condition.type) { case 'expression': return this.evaluateExpression( condition.expression!, null, execution.variables ) as boolean; case 'variable': const variable = execution.variables[condition.variables![0]]; return this.compareValues( variable, condition.operator!, condition.value ); case 'previous_task': const taskResult = execution.tasks.find( (t) => t.taskId === condition.variables![0] ); return taskResult?.status === 'completed'; default: return true; } } private compareValues(actual: any, operator: string, expected: any): boolean { switch (operator) { case 'eq': return actual === expected; case 'ne': return actual !== expected; case 'gt': return actual > expected; case 'lt': return actual < expected; case 'gte': return actual >= expected; case 'lte': return actual <= expected; case 'contains': return String(actual).includes(String(expected)); case 'matches': return new RegExp(expected).test(String(actual)); case 'exists': return actual !== undefined && actual !== null; default: return false; } } /** * Interpolate variables in string templates */ private interpolateVariables( template: string, variables: Record<string, any> ): string { return template.replace(/\{\{(\w+)\}\}/g, (match, key) => { return variables[key] ?? match; }); } /** * Evaluate expression with context */ private evaluateExpression( expression: string, data: any, variables: Record<string, any> ): any { // Simple expression evaluation - in production, use a proper expression engine try { const context = { ...variables, data }; const func = new Function( ...Object.keys(context), `return ${expression}` ); return func(...Object.values(context)); } catch { return false; } } /** * Update execution status and emit events */ private updateExecutionStatus( execution: WorkflowExecution, status: WorkflowStatus ): void { execution.status = status; if ( status === 'completed' || status === 'failed' || status === 'cancelled' ) { execution.endTime = new Date(); execution.duration = execution.endTime.getTime() - execution.startTime.getTime(); } this.emit('execution-status-changed', execution.id, status); } /** * Add execution log entry */ private addExecutionLog( execution: WorkflowExecution, level: 'debug' | 'info' | 'warn' | 'error', source: string, message: string, data?: any ): void { const log: ExecutionLog = { timestamp: new Date(), level, source, message, data, }; execution.logs.push(log); this.logger[level](`[${execution.id}] ${message}`, data); } /** * Execute task actions (onSuccess, onError, etc.) */ private async executeTaskActions( execution: WorkflowExecution, actions: any[] ): Promise<void> { for (const action of actions) { this.addExecutionLog( execution, 'info', 'task-action', `Executing action: ${action.type || 'custom'}` ); // Implementation would depend on action types defined in workflow } } // ... Additional utility methods for task queues, dependency graphs, approvals, etc. /** * Get execution by ID */ getExecution(executionId: string): WorkflowExecution | undefined { return this.executions.get(executionId); } /** * Cancel workflow execution */ async cancelExecution(executionId: string): Promise<void> { const execution = this.executions.get(executionId); if (!execution) { throw new Error(`Execution not found: ${executionId}`); } this.updateExecutionStatus(execution, 'cancelled'); this.addExecutionLog( execution, 'info', 'workflow', 'Workflow cancelled by user' ); // Cancel running tasks const runningTasks = execution.tasks.filter((t) => t.status === 'running'); for (const task of runningTasks) { if (task.sessionId) { await this.consoleManager.stopSession(task.sessionId); } } this.emit('execution-cancelled', executionId); } /** * Get all executions */ getAllExecutions(): WorkflowExecution[] { return Array.from(this.executions.values()); } /** * Clean up completed executions */ cleanup(olderThanHours: number = 24): void { const cutoff = Date.now() - olderThanHours * 60 * 60 * 1000; Array.from(this.executions.entries()).forEach(([id, execution]) => { if (execution.endTime && execution.endTime.getTime() < cutoff) { this.executions.delete(id); this.taskQueues.delete(id); this.stateMachines.delete(id); } }); } // Additional private helper methods would go here... private createTaskQueue(executionId: string): void { this.taskQueues.set(executionId, new PQueue({ concurrency: 5 })); } private async initializeVariables( execution: WorkflowExecution, workflow: WorkflowDefinition ): Promise<void> { // Initialize workflow variables from definition for (const variable of workflow.variables) { if (!(variable.name in execution.variables)) { execution.variables[variable.name] = variable.defaultValue; } } } private buildTaskDependencyGraph( tasks: WorkflowTask[] ): Map<string, WorkflowTask> { return new Map(tasks.map((task) => [task.id, task])); } private getReadyTasks( taskGraph: Map<string, WorkflowTask>, completed: Set<string> ): WorkflowTask[] { const ready: WorkflowTask[] = []; Array.from(taskGraph.values()).forEach((task) => { const allDependenciesCompleted = task.dependsOn.every((dep) => completed.has(dep) ); if (allDependenciesCompleted && !completed.has(task.id)) { ready.push(task); } }); return ready; } private hasPendingTasks(execution: WorkflowExecution): boolean { return execution.tasks.some( (t) => t.status === 'running' || t.status === 'pending' ); } private allTasksCompleted( taskGraph: Map<string, WorkflowTask>, completed: Set<string> ): boolean { return taskGraph.size === completed.size; } private createStateMachine(workflow: WorkflowDefinition): StateMachine { // Create a basic state machine from workflow tasks // In practice, this would be more sophisticated return { id: workflow.id, name: workflow.name, initialState: 'start', states: [ { id: 'start', name: 'Start', type: 'initial', tasks: [] }, { id: 'end', name: 'End', type: 'final', tasks: [] }, ], transitions: [ { id: 'start-to-end', from: 'start', to: 'end', actions: [] }, ], context: {}, }; } private async executeStateTasks( execution: WorkflowExecution, workflow: WorkflowDefinition, state: State ): Promise<void> { if (!state.tasks) return; for (const taskId of state.tasks) { const task = workflow.tasks.find((t) => t.id === taskId); if (task) { await this.executeTask(execution, workflow, task); } } } private async findValidTransition( execution: WorkflowExecution, stateMachine: StateMachine, currentStateId: string ): Promise<Transition | undefined> { const transitions = stateMachine.transitions.filter( (t) => t.from === currentStateId ); for (const transition of transitions) { if ( !transition.condition || (await this.evaluateCondition(execution, transition.condition)) ) { return transition; } } return undefined; } private async executeTransitionActions( execution: WorkflowExecution, actions: any[] ): Promise<void> { // Execute state transition actions for (const action of actions) { // Implementation depends on action types this.addExecutionLog( execution, 'info', 'state-machine', `Executing transition action: ${action.type}` ); } } private async handleTaskError( execution: WorkflowExecution, task: WorkflowTask, error: any ): Promise<void> { const executionError: ExecutionError = { timestamp: new Date(), source: task.id, type: error.constructor.name, message: error.message, stack: error.stack, context: { taskId: task.id }, recoverable: this.isRecoverableError(error), }; execution.errors.push(executionError); this.addExecutionLog( execution, 'error', 'task', `Task error: ${task.name} - ${error.message}` ); } private async handleExecutionError( execution: WorkflowExecution, error: any ): Promise<void> { this.updateExecutionStatus(execution, 'failed'); this.addExecutionLog( execution, 'error', 'workflow', `Workflow failed: ${error.message}` ); } private isRecoverableError(error: any): boolean { // Implement logic to determine if error is recoverable return error.code !== 'FATAL'; } private generateParallelSubtasks(task: WorkflowTask): WorkflowTask[] { // Generate subtasks for parallel execution // This is a simplified implementation return [task]; // In practice, split task into parallel subtasks } private aggregateParallelResults(results: any[], strategy: string): any { switch (strategy) { case 'array': return results; case 'first': return results[0]; case 'last': return results[results.length - 1]; case 'merge': return Object.assign({}, ...results); default: return results; } } private async retryTask( execution: WorkflowExecution, workflow: WorkflowDefinition, task: WorkflowTask, taskExecution: TaskExecution, retryPolicy: RetryPolicy ): Promise<void> { taskExecution.attempts++; const delay = this.calculateRetryDelay(retryPolicy, taskExecution.attempts); this.addExecutionLog( execution, 'info', 'task', `Retrying task ${task.name} (attempt ${taskExecution.attempts}/${retryPolicy.maxAttempts}) after ${delay}ms` ); await new Promise((resolve) => setTimeout(resolve, delay)); // Reset task status and retry taskExecution.status = 'running'; await this.executeSingleTask(execution, workflow, task, taskExecution); } private calculateRetryDelay(policy: RetryPolicy, attempt: number): number { switch (policy.backoff) { case 'exponential': return Math.min( policy.initialDelay * Math.pow(policy.multiplier || 2, attempt - 1), policy.maxDelay || 30000 ); case 'linear': return Math.min( policy.initialDelay * attempt, policy.maxDelay || 30000 ); default: return policy.initialDelay; } } private async handleTaskApproval( execution: WorkflowExecution, task: WorkflowTask ): Promise<void> { if (!task.approval) return; const approvalExecution: ApprovalExecution = { id: uuidv4(), configId: task.approval.configId, status: 'pending', requestTime: new Date(), context: task.approval.context || {}, }; execution.approvals.push(approvalExecution); this.addExecutionLog( execution, 'info', 'approval', `Approval required for task: ${task.name}` ); this.emit('approval-required', execution.id, approvalExecution); // Wait for approval return new Promise((resolve, reject) => { this.approvalCallbacks.set( approvalExecution.id, (approved: boolean, comments?: string) => { approvalExecution.status = approved ? 'approved' : 'rejected'; approvalExecution.responseTime = new Date(); approvalExecution.comments = comments; if (approved) { resolve(); } else { reject( new Error( `Task approval rejected: ${comments || 'No comments provided'}` ) ); } } ); }); } private async executeFileOperationTask( execution: WorkflowExecution, task: WorkflowTask, taskExecution: TaskExecution ): Promise<any> { // Implement file operations throw new Error('File operation task not implemented yet'); } private async executeScriptTask( execution: WorkflowExecution, task: WorkflowTask, taskExecution: TaskExecution ): Promise<any> { // Implement script execution throw new Error('Script task not implemented yet'); } private async waitForExecution( executionId: string ): Promise<WorkflowExecution> { return new Promise((resolve, reject) => { const checkStatus = () => { const execution = this.executions.get(executionId); if (!execution) { reject(new Error(`Execution not found: ${executionId}`)); return; } if ( execution.status === 'completed' || execution.status === 'failed' || execution.status === 'cancelled' ) { resolve(execution); } else { setTimeout(checkStatus, 1000); } }; checkStatus(); }); } /** * Approve a pending approval */ approveTask(approvalId: string, approved: boolean, comments?: string): void { const callback = this.approvalCallbacks.get(approvalId); if (callback) { callback(approved, comments); this.approvalCallbacks.delete(approvalId); } } }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/ooples/mcp-console-automation'

If you have feedback or need assistance with the MCP directory API, please join our Discord server