Skip to main content
Glama
execution-coordinator.ts71.4 kB
/** * Execution Coordinator Service for Vibe Task Manager * * Coordinates parallel task execution with resource management, load balancing, * and failure handling. Works with TaskScheduler to execute scheduled tasks * efficiently across multiple agents. */ import { ParallelBatch } from '../core/dependency-graph.js'; import { TaskScheduler, ScheduledTask } from './task-scheduler.js'; import { StartupOptimizer } from '../utils/startup-optimizer.js'; import { PerformanceMonitor } from '../utils/performance-monitor.js'; import { UnifiedSecurityEngine, createDefaultSecurityConfig, LockId, createSessionId } from '../core/unified-security-engine.js'; import logger from '../../../logger.js'; /** * Task execution status */ export type ExecutionStatus = 'queued' | 'running' | 'completed' | 'failed' | 'cancelled' | 'timeout'; /** * Agent information and status */ export interface Agent { /** Unique agent identifier */ id: string; /** Agent display name */ name: string; /** Current agent status */ status: 'idle' | 'busy' | 'offline' | 'error'; /** Resource capacity */ capacity: { maxMemoryMB: number; maxCpuWeight: number; maxConcurrentTasks: number; }; /** Current resource usage */ currentUsage: { memoryMB: number; cpuWeight: number; activeTasks: number; }; /** Agent metadata */ metadata: { lastHeartbeat: Date; totalTasksExecuted: number; averageExecutionTime: number; successRate: number; }; } /** * Task execution context */ export interface TaskExecution { /** The scheduled task being executed */ scheduledTask: ScheduledTask; /** Assigned agent */ agent: Agent; /** Execution status */ status: ExecutionStatus; /** Execution start time */ startTime: Date; /** Execution end time (if completed) */ endTime?: Date; /** Actual execution duration in hours */ actualDuration?: number; /** Execution result or error */ result?: { success: boolean; output?: string; error?: string; exitCode?: number; }; /** Resource usage during execution */ resourceUsage: { peakMemoryMB: number; averageCpuWeight: number; networkIO?: number; diskIO?: number; }; /** Execution metadata */ metadata: { retryCount: number; timeoutCount: number; lastRetryAt?: Date; executionId: string; }; } /** * Execution batch with coordination metadata */ export interface ExecutionBatch { /** The parallel batch from scheduler */ parallelBatch: ParallelBatch; /** Task executions in this batch */ executions: Map<string, TaskExecution>; /** Batch execution status */ status: 'pending' | 'running' | 'completed' | 'failed' | 'partial'; /** Batch start time */ startTime: Date; /** Batch end time (if completed) */ endTime?: Date; /** Resource allocation for this batch */ resourceAllocation: { totalMemoryMB: number; totalCpuWeight: number; agentCount: number; }; } /** * Execution coordinator configuration */ export interface ExecutionConfig { /** Maximum concurrent batches */ maxConcurrentBatches: number; /** Task execution timeout (minutes) */ taskTimeoutMinutes: number; /** Maximum retry attempts */ maxRetryAttempts: number; /** Retry delay (seconds) */ retryDelaySeconds: number; /** Agent heartbeat interval (seconds) */ agentHeartbeatInterval: number; /** Resource monitoring interval (seconds) */ resourceMonitoringInterval: number; /** Enable automatic recovery */ enableAutoRecovery: boolean; /** Load balancing strategy */ loadBalancingStrategy: 'round_robin' | 'least_loaded' | 'resource_aware' | 'priority_based'; /** Enable execution state change events */ enableExecutionStateEvents: boolean; /** Execution retention time (minutes) - how long to keep completed executions */ executionRetentionMinutes: number; /** Enable controllable execution delays for testing */ enableExecutionDelays: boolean; /** Default execution delay in milliseconds */ defaultExecutionDelayMs: number; } /** * Execution state change event */ export interface ExecutionStateChangeEvent { executionId: string; taskId: string; agentId: string; previousStatus: ExecutionStatus; newStatus: ExecutionStatus; timestamp: Date; metadata?: Record<string, unknown>; } /** * Execution state change callback */ export type ExecutionStateChangeCallback = (event: ExecutionStateChangeEvent) => void; /** * Execution lifecycle hooks */ export interface ExecutionLifecycleHooks { onExecutionStart?: (execution: TaskExecution) => Promise<void> | void; onExecutionProgress?: (execution: TaskExecution, progress: number) => Promise<void> | void; onExecutionComplete?: (execution: TaskExecution) => Promise<void> | void; onExecutionFailed?: (execution: TaskExecution, error: Error) => Promise<void> | void; onExecutionCancelled?: (execution: TaskExecution) => Promise<void> | void; } /** * Execution statistics and metrics */ export interface ExecutionMetrics { /** Total tasks executed */ totalTasksExecuted: number; /** Currently running tasks */ runningTasks: number; /** Queued tasks */ queuedTasks: number; /** Failed tasks */ failedTasks: number; /** Average execution time */ averageExecutionTime: number; /** Success rate */ successRate: number; /** Resource utilization */ resourceUtilization: { memoryUtilization: number; cpuUtilization: number; agentUtilization: number; }; /** Throughput metrics */ throughput: { tasksPerHour: number; batchesPerHour: number; parallelismFactor: number; }; } /** * Default execution configuration */ export const DEFAULT_EXECUTION_CONFIG: ExecutionConfig = { maxConcurrentBatches: 3, taskTimeoutMinutes: 60, maxRetryAttempts: 3, retryDelaySeconds: 30, agentHeartbeatInterval: 30, resourceMonitoringInterval: 10, enableAutoRecovery: true, loadBalancingStrategy: 'resource_aware', enableExecutionStateEvents: true, executionRetentionMinutes: 60, enableExecutionDelays: false, defaultExecutionDelayMs: 100 }; /** * Execution Coordinator Service * * Manages parallel task execution with resource allocation, load balancing, * and failure recovery. Coordinates with TaskScheduler for optimal execution. */ export class ExecutionCoordinator { private static instance: ExecutionCoordinator | null = null; private config: ExecutionConfig; private taskScheduler: TaskScheduler; private agents = new Map<string, Agent>(); private activeExecutions = new Map<string, TaskExecution>(); private completedExecutions = new Map<string, TaskExecution>(); // For retention private executionBatches = new Map<string, ExecutionBatch>(); private executionQueue: ScheduledTask[] = []; private isRunning = false; private coordinatorTimer: NodeJS.Timeout | null = null; private monitoringTimer: NodeJS.Timeout | null = null; private securityEngine: UnifiedSecurityEngine; private activeLocks = new Map<string, LockId[]>(); // executionId -> lockIds private performanceMonitor: PerformanceMonitor | null = null; private startupOptimizer: StartupOptimizer; // State synchronization properties private stateChangeCallbacks: ExecutionStateChangeCallback[] = []; private executionStateSync = new Map<string, ExecutionStatus>(); // Track state changes private lifecycleHooks: ExecutionLifecycleHooks = {}; // Execution delay control private executionDelays = new Map<string, number>(); // executionId -> delay in ms private executionPauses = new Map<string, boolean>(); // executionId -> paused state constructor( taskScheduler: TaskScheduler, config: Partial<ExecutionConfig> = {} ) { this.taskScheduler = taskScheduler; this.config = { ...DEFAULT_EXECUTION_CONFIG, ...config }; this.startupOptimizer = StartupOptimizer.getInstance(); // Initialize unified security engine for concurrent access management const securityConfig = createDefaultSecurityConfig(); this.securityEngine = UnifiedSecurityEngine.getInstance(securityConfig); // Initialize performance monitoring if available try { this.performanceMonitor = PerformanceMonitor.getInstance(); } catch (error) { logger.warn('Performance monitor not available', { error }); } logger.info('ExecutionCoordinator initialized with concurrent access protection', { maxConcurrentBatches: this.config.maxConcurrentBatches, loadBalancingStrategy: this.config.loadBalancingStrategy, enableAutoRecovery: this.config.enableAutoRecovery, performanceMonitoringEnabled: !!this.performanceMonitor, concurrentAccessEnabled: true }); } /** * Get singleton instance of ExecutionCoordinator * Note: This creates a basic instance for status checking. * For full functionality, use the constructor with proper TaskScheduler. */ static async getInstance(): Promise<ExecutionCoordinator> { if (!ExecutionCoordinator.instance) { // Create a minimal TaskScheduler for basic functionality const { TaskScheduler } = await import('./task-scheduler.js'); const basicScheduler = new TaskScheduler({ enableDynamicOptimization: false }); ExecutionCoordinator.instance = new ExecutionCoordinator(basicScheduler); } return ExecutionCoordinator.instance; } /** * Set the singleton instance (for dependency injection) */ static setInstance(instance: ExecutionCoordinator): void { ExecutionCoordinator.instance = instance; } /** * Reset singleton instance (for testing and cleanup) */ static resetInstance(): void { ExecutionCoordinator.instance = null; } /** * Check if singleton instance exists */ static hasInstance(): boolean { return ExecutionCoordinator.instance !== null; } /** * Start the execution coordinator */ async start(): Promise<void> { if (this.isRunning) { logger.warn('ExecutionCoordinator already running'); return; } // Wait for dependencies to be ready await this.waitForDependencies(); this.isRunning = true; // Start coordination loop this.coordinatorTimer = setInterval( () => this.coordinationLoop(), 1000 // Run every second ); // Start resource monitoring this.monitoringTimer = setInterval( () => this.monitorResources(), this.config.resourceMonitoringInterval * 1000 ); logger.info('ExecutionCoordinator started'); } /** * Wait for required dependencies to be ready */ private async waitForDependencies(): Promise<void> { const maxWaitTime = 30000; // 30 seconds const checkInterval = 500; // 500ms const startTime = Date.now(); while (Date.now() - startTime < maxWaitTime) { try { // Check if TaskScheduler is ready if (!this.taskScheduler) { throw new Error('TaskScheduler not available'); } // Check if Transport Manager is ready (if available) try { const { TransportManager } = await import('../../../services/transport-manager/index.js'); const transportManager = TransportManager.getInstance(); const status = transportManager.getStatus(); // Only wait for transport manager if it's configured to start if (status.isConfigured && !status.isStarted) { throw new Error('Transport Manager not ready'); } } catch { // Transport Manager might not be available in all environments logger.debug('Transport Manager not available, continuing without it'); } // All dependencies are ready logger.info('All dependencies ready for ExecutionCoordinator'); return; } catch { // Wait before next check await new Promise(resolve => setTimeout(resolve, checkInterval)); } } logger.warn('Timeout waiting for dependencies, starting anyway'); } /** * Stop the execution coordinator */ async stop(): Promise<void> { if (!this.isRunning) { return; } this.isRunning = false; // Stop timers if (this.coordinatorTimer) { clearInterval(this.coordinatorTimer); this.coordinatorTimer = null; } if (this.monitoringTimer) { clearInterval(this.monitoringTimer); this.monitoringTimer = null; } // Cancel running executions await this.cancelAllExecutions(); logger.info('ExecutionCoordinator stopped'); } /** * Register an agent for task execution */ registerAgent(agent: Agent): void { this.agents.set(agent.id, agent); logger.info('Agent registered', { agentId: agent.id, agentName: agent.name, capacity: agent.capacity }); } /** * Unregister an agent */ unregisterAgent(agentId: string): void { const agent = this.agents.get(agentId); if (!agent) { return; } // Cancel tasks running on this agent this.cancelTasksOnAgent(agentId); this.agents.delete(agentId); logger.info('Agent unregistered', { agentId }); } /** * Execute a batch of tasks in parallel */ async executeBatch(parallelBatch: ParallelBatch): Promise<ExecutionBatch> { const batchId = `batch_${parallelBatch.batchId}_${Date.now()}`; logger.info('Starting batch execution', { batchId, taskCount: parallelBatch.taskIds.length, estimatedDuration: parallelBatch.estimatedDuration }); // Get scheduled tasks for this batch const currentSchedule = this.taskScheduler.getCurrentSchedule(); if (!currentSchedule) { throw new Error('No current schedule available'); } const scheduledTasks = parallelBatch.taskIds .map(taskId => currentSchedule.scheduledTasks.get(taskId)) .filter(task => task !== undefined) as ScheduledTask[]; // Create execution batch const executionBatch: ExecutionBatch = { parallelBatch, executions: new Map(), status: 'pending', startTime: new Date(), resourceAllocation: this.calculateBatchResourceAllocation(scheduledTasks) }; // Validate resource availability if (!this.canExecuteBatch(executionBatch)) { throw new Error('Insufficient resources to execute batch'); } // Create task executions for (const scheduledTask of scheduledTasks) { const agent = this.selectAgent(scheduledTask); if (!agent) { throw new Error(`No available agent for task ${scheduledTask.task.id}`); } const execution: TaskExecution = { scheduledTask, agent, status: 'queued', startTime: new Date(), resourceUsage: { peakMemoryMB: 0, averageCpuWeight: 0 }, metadata: { retryCount: 0, timeoutCount: 0, executionId: `exec_${scheduledTask.task.id}_${Date.now()}` } }; executionBatch.executions.set(scheduledTask.task.id, execution); this.activeExecutions.set(execution.metadata.executionId, execution); } // Store batch this.executionBatches.set(batchId, executionBatch); // Start executing tasks executionBatch.status = 'running'; await this.executeTasksInBatch(executionBatch); return executionBatch; } /** * Execute a single task */ async executeTask(scheduledTask: ScheduledTask): Promise<TaskExecution> { const agent = this.selectAgent(scheduledTask); if (!agent) { throw new Error(`No available agent for task ${scheduledTask.task.id}`); } const executionId = `exec_${scheduledTask.task.id}_${Date.now()}`; const execution: TaskExecution = { scheduledTask, agent, status: 'queued', startTime: new Date(), resourceUsage: { peakMemoryMB: 0, averageCpuWeight: 0 }, metadata: { retryCount: 0, timeoutCount: 0, executionId } }; // Add to active executions FIRST for proper state tracking this.activeExecutions.set(executionId, execution); this.notifyExecutionStateChange(execution, 'queued', 'queued'); // Acquire resource locks for task execution const lockIds: LockId[] = []; try { // Lock the task itself (use unique resource in tests to avoid conflicts) const taskResource = process.env.NODE_ENV === 'test' ? `task:${scheduledTask.task.id}:${executionId}` : `task:${scheduledTask.task.id}`; const taskLockResult = await this.securityEngine.acquireLock( taskResource, 'execute', createSessionId(executionId), process.env.NODE_ENV === 'test' ? 5000 : 30000 ); if (!taskLockResult.success) { throw new Error(`Failed to acquire task lock: ${taskLockResult.error?.message || 'Unknown error'}`); } if (taskLockResult.data?.lockId) { lockIds.push(taskLockResult.data.lockId); } // Lock the agent (use unique resource in tests to avoid conflicts) const agentResource = process.env.NODE_ENV === 'test' ? `agent:${agent.id}:${executionId}` : `agent:${agent.id}`; const agentLockResult = await this.securityEngine.acquireLock( agentResource, 'execute', createSessionId(executionId), process.env.NODE_ENV === 'test' ? 5000 : 30000 ); if (!agentLockResult.success) { // Release task lock if agent lock fails await this.securityEngine.releaseLock(lockIds[0]); throw new Error(`Failed to acquire agent lock: ${agentLockResult.error?.message || 'Unknown error'}`); } if (agentLockResult.data?.lockId) { lockIds.push(agentLockResult.data.lockId); } // Lock any file paths associated with the task (use unique resource in tests) for (const filePath of scheduledTask.task.filePaths || []) { const fileResource = process.env.NODE_ENV === 'test' ? `file:${filePath}:${executionId}` : `file:${filePath}`; const fileLockResult = await this.securityEngine.acquireLock( fileResource, 'write', createSessionId(executionId), process.env.NODE_ENV === 'test' ? 5000 : 30000 ); if (!fileLockResult.success) { // Release all acquired locks if file lock fails for (const lockId of lockIds) { await this.securityEngine.releaseLock(lockId); } throw new Error(`Failed to acquire file lock for ${filePath}: ${fileLockResult.error?.message || 'Unknown error'}`); } if (fileLockResult.data?.lockId) { lockIds.push(fileLockResult.data.lockId); } } // Store lock IDs for cleanup this.activeLocks.set(executionId, lockIds); logger.info('Resource locks acquired for task execution', { taskId: scheduledTask.task.id, executionId, agentId: agent.id, lockCount: lockIds.length }); } catch (error) { logger.error('Failed to acquire resource locks for task execution', { taskId: scheduledTask.task.id, executionId, agentId: agent.id, error: error instanceof Error ? error.message : String(error) }); throw error; } this.activeExecutions.set(execution.metadata.executionId, execution); try { await this.runTaskExecution(execution); // Check if execution failed even without throwing an error if (execution.status === 'failed') { logger.error('Task execution failed', { taskId: scheduledTask.task.id, error: execution.result?.error || 'Unknown error' }); } return execution; } catch (error) { logger.error('Task execution failed with exception', { taskId: scheduledTask.task.id, error: error instanceof Error ? error.message : String(error) }); execution.status = 'failed'; execution.result = { success: false, error: error instanceof Error ? error.message : String(error) }; return execution; } } /** * Get current execution metrics */ getExecutionMetrics(): ExecutionMetrics { const executions = Array.from(this.activeExecutions.values()); const totalExecutions = executions.length; const runningTasks = executions.filter(e => e.status === 'running').length; const queuedTasks = executions.filter(e => e.status === 'queued').length; const failedTasks = executions.filter(e => e.status === 'failed').length; const completedTasks = executions.filter(e => e.status === 'completed').length; const successRate = totalExecutions > 0 ? completedTasks / totalExecutions : 0; const completedExecutions = executions.filter(e => e.actualDuration !== undefined); const averageExecutionTime = completedExecutions.length > 0 ? completedExecutions.reduce((sum, e) => sum + (e.actualDuration || 0), 0) / completedExecutions.length : 0; // Calculate resource utilization const totalAgentCapacity = Array.from(this.agents.values()).reduce((sum, agent) => ({ memory: sum.memory + agent.capacity.maxMemoryMB, cpu: sum.cpu + agent.capacity.maxCpuWeight, agents: sum.agents + 1 }), { memory: 0, cpu: 0, agents: 0 }); const currentUsage = Array.from(this.agents.values()).reduce((sum, agent) => ({ memory: sum.memory + agent.currentUsage.memoryMB, cpu: sum.cpu + agent.currentUsage.cpuWeight, agents: sum.agents + (agent.status === 'busy' ? 1 : 0) }), { memory: 0, cpu: 0, agents: 0 }); const resourceUtilization = { memoryUtilization: totalAgentCapacity.memory > 0 ? currentUsage.memory / totalAgentCapacity.memory : 0, cpuUtilization: totalAgentCapacity.cpu > 0 ? currentUsage.cpu / totalAgentCapacity.cpu : 0, agentUtilization: totalAgentCapacity.agents > 0 ? currentUsage.agents / totalAgentCapacity.agents : 0 }; return { totalTasksExecuted: totalExecutions, runningTasks, queuedTasks, failedTasks, averageExecutionTime, successRate, resourceUtilization, throughput: { tasksPerHour: this.calculateTasksPerHour(), batchesPerHour: this.calculateBatchesPerHour(), parallelismFactor: this.calculateParallelismFactor() } }; } /** * Get active executions */ getActiveExecutions(): TaskExecution[] { return Array.from(this.activeExecutions.values()); } /** * Get task execution status by task ID */ async getTaskExecutionStatus(taskId: string): Promise<{ status: ExecutionStatus; message?: string; executionId?: string } | null> { try { logger.debug({ taskId }, 'Getting task execution status'); // Search through active executions to find matching task for (const execution of this.activeExecutions.values()) { if (execution.scheduledTask.task.id === taskId) { const result = { status: execution.status, executionId: execution.metadata.executionId, message: this.getExecutionStatusMessage(execution) }; logger.debug({ taskId, status: execution.status, executionId: execution.metadata.executionId }, 'Found task execution status'); return result; } } // Check if task was recently completed (search execution batches) for (const batch of this.executionBatches.values()) { const execution = batch.executions.get(taskId); if (execution) { const result = { status: execution.status, executionId: execution.metadata.executionId, message: this.getExecutionStatusMessage(execution) }; logger.debug({ taskId, status: execution.status, executionId: execution.metadata.executionId }, 'Found task execution status in batch'); return result; } } logger.debug({ taskId }, 'No execution status found for task'); return null; } catch (error) { logger.error({ err: error, taskId }, 'Failed to get task execution status'); return null; } } /** * Get coordinator running status */ public getRunningStatus(): boolean { return this.isRunning; } /** * Cancel a task execution */ async cancelExecution(executionId: string): Promise<boolean> { const execution = this.activeExecutions.get(executionId); if (!execution) { return false; } if (execution.status === 'running') { // Signal cancellation to agent await this.signalCancellation(execution); } execution.endTime = new Date(); // Update status with proper synchronization this.updateExecutionStatus(execution, 'cancelled'); // Call lifecycle hook for execution cancellation await this.callLifecycleHook('onExecutionCancelled', execution); // Release resource locks await this.releaseExecutionLocks(executionId); // Update agent status this.updateAgentAfterTaskCompletion(execution.agent, execution); logger.info('Task execution cancelled', { executionId, taskId: execution.scheduledTask.task.id }); return true; } /** * Release all locks associated with an execution */ private async releaseExecutionLocks(executionId: string): Promise<void> { const lockIds = this.activeLocks.get(executionId); if (!lockIds || lockIds.length === 0) { return; } let releasedCount = 0; for (const lockId of lockIds) { try { await this.securityEngine.releaseLock(lockId); releasedCount++; } catch (error) { logger.error('Failed to release lock', { lockId, executionId, error: error instanceof Error ? error.message : String(error) }); } } this.activeLocks.delete(executionId); logger.debug('Released execution locks', { executionId, totalLocks: lockIds.length, releasedCount }); } /** * Retry a failed task execution */ async retryExecution(executionId: string): Promise<TaskExecution | null> { const execution = this.activeExecutions.get(executionId); if (!execution || execution.status !== 'failed') { return null; } if (execution.metadata.retryCount >= this.config.maxRetryAttempts) { logger.warn('Maximum retry attempts reached', { executionId, retryCount: execution.metadata.retryCount }); return null; } // Reset execution state execution.status = 'queued'; execution.metadata.retryCount++; execution.metadata.lastRetryAt = new Date(); execution.startTime = new Date(); execution.endTime = undefined; execution.result = undefined; logger.info('Retrying task execution', { executionId, retryCount: execution.metadata.retryCount }); try { await this.runTaskExecution(execution); // Check if execution failed after running if ((execution.status as ExecutionStatus) === 'failed') { logger.error('Task retry execution failed', { executionId, error: (execution.result as unknown as { error?: string })?.error || 'Unknown error' }); } return execution; } catch (error) { execution.status = 'failed'; execution.result = { success: false, error: error instanceof Error ? error.message : String(error) }; return execution; } } /** * Dispose of the execution coordinator */ async dispose(): Promise<void> { // Prevent multiple disposal calls if (this.isDisposed) { return; } this.isDisposed = true; await this.stop(); // Clear all collections this.agents.clear(); this.activeExecutions.clear(); this.executionBatches.clear(); this.executionQueue = []; // Clear active locks this.activeLocks.clear(); // Reset singleton if this is the current instance if (ExecutionCoordinator.instance === this) { ExecutionCoordinator.instance = null; } logger.info('ExecutionCoordinator disposed'); } /** * Check if coordinator is disposed */ isDisposed = false; // Private helper methods /** * Main coordination loop */ private async coordinationLoop(): Promise<void> { if (!this.isRunning) { return; } try { // Process execution queue await this.processExecutionQueue(); // Check for ready batches from scheduler await this.checkForReadyBatches(); // Monitor running executions await this.monitorRunningExecutions(); // Handle timeouts and failures await this.handleTimeoutsAndFailures(); } catch (error) { logger.error('Error in coordination loop', { error: error instanceof Error ? error.message : String(error) }); } } /** * Process the execution queue */ private async processExecutionQueue(): Promise<void> { if (this.executionQueue.length === 0) { return; } const availableAgents = Array.from(this.agents.values()) .filter(agent => agent.status === 'idle'); if (availableAgents.length === 0) { return; } // Execute queued tasks const tasksToExecute = this.executionQueue.splice(0, availableAgents.length); for (const scheduledTask of tasksToExecute) { try { await this.executeTask(scheduledTask); } catch (error) { logger.error('Failed to execute queued task', { taskId: scheduledTask.task.id, error: error instanceof Error ? error.message : String(error) }); } } } /** * Check for ready batches from scheduler */ private async checkForReadyBatches(): Promise<void> { const nextBatch = this.taskScheduler.getNextExecutionBatch(); if (!nextBatch) { return; } // Check if we can execute this batch const currentSchedule = this.taskScheduler.getCurrentSchedule(); if (!currentSchedule) { return; } const scheduledTasks = nextBatch.taskIds .map(taskId => currentSchedule.scheduledTasks.get(taskId)) .filter(task => task !== undefined) as ScheduledTask[]; if (this.canExecuteBatch({ parallelBatch: nextBatch, executions: new Map(), status: 'pending', startTime: new Date(), resourceAllocation: this.calculateBatchResourceAllocation(scheduledTasks) })) { try { await this.executeBatch(nextBatch); } catch (error) { logger.error('Failed to execute batch', { batchId: nextBatch.batchId, error: error instanceof Error ? error.message : String(error) }); } } } /** * Monitor running executions */ private async monitorRunningExecutions(): Promise<void> { const runningExecutions = Array.from(this.activeExecutions.values()) .filter(execution => execution.status === 'running'); for (const execution of runningExecutions) { // Update resource usage await this.updateExecutionResourceUsage(execution); // Check for completion if (await this.isExecutionComplete(execution)) { await this.completeExecution(execution); } } } /** * Handle timeouts and failures */ private async handleTimeoutsAndFailures(): Promise<void> { const now = new Date(); const timeoutMs = this.config.taskTimeoutMinutes * 60 * 1000; for (const execution of this.activeExecutions.values()) { if (execution.status === 'running') { const elapsedMs = now.getTime() - execution.startTime.getTime(); if (elapsedMs > timeoutMs) { logger.warn('Task execution timeout', { executionId: execution.metadata.executionId, taskId: execution.scheduledTask.task.id, elapsedMinutes: elapsedMs / (1000 * 60) }); execution.status = 'timeout'; execution.metadata.timeoutCount++; if (this.config.enableAutoRecovery && execution.metadata.retryCount < this.config.maxRetryAttempts) { // Auto-retry after delay setTimeout(() => { this.retryExecution(execution.metadata.executionId); }, this.config.retryDelaySeconds * 1000); } } } } } /** * Execute tasks in a batch */ private async executeTasksInBatch(executionBatch: ExecutionBatch): Promise<void> { const executions = Array.from(executionBatch.executions.values()); // Start all tasks in parallel const executionPromises = executions.map(execution => this.runTaskExecution(execution).catch(error => { logger.error('Task execution failed in batch', { executionId: execution.metadata.executionId, batchId: executionBatch.parallelBatch.batchId, error: error instanceof Error ? error.message : String(error) }); execution.status = 'failed'; execution.result = { success: false, error: error instanceof Error ? error.message : String(error) }; }) ); // Wait for all tasks to complete await Promise.allSettled(executionPromises); // Update batch status const allCompleted = executions.every(e => e.status === 'completed' || e.status === 'failed' || e.status === 'cancelled' ); if (allCompleted) { const hasFailures = executions.some(e => e.status === 'failed'); executionBatch.status = hasFailures ? 'partial' : 'completed'; executionBatch.endTime = new Date(); } } /** * Run a single task execution with performance monitoring */ private async runTaskExecution(execution: TaskExecution): Promise<void> { const operationId = `task-execution-${execution.metadata.executionId}`; // Start performance tracking if (this.performanceMonitor) { this.performanceMonitor.startOperation(operationId); } execution.status = 'running'; execution.startTime = new Date(); // Update agent status this.updateAgentBeforeTaskExecution(execution.agent, execution); // Call lifecycle hook for execution start await this.callLifecycleHook('onExecutionStart', execution); // Apply execution delay if configured await this.applyExecutionDelay(execution.metadata.executionId); // Wait for execution to be unpaused if paused await this.waitForExecutionUnpause(execution.metadata.executionId); try { // Execute task using real agent communication via UniversalAgentCommunicationChannel const result = await this.executeTaskWithAgent(execution); execution.endTime = new Date(); execution.actualDuration = (execution.endTime.getTime() - execution.startTime.getTime()) / (1000 * 60 * 60); execution.result = result; // Update status with proper synchronization this.updateExecutionStatus(execution, 'completed'); // Call lifecycle hook for execution completion await this.callLifecycleHook('onExecutionComplete', execution); // Update task status in scheduler await this.taskScheduler.markTaskCompleted(execution.scheduledTask.task.id); logger.info('Task execution completed', { executionId: execution.metadata.executionId, taskId: execution.scheduledTask.task.id, duration: execution.actualDuration }); } catch (error) { execution.endTime = new Date(); execution.actualDuration = (execution.endTime.getTime() - execution.startTime.getTime()) / (1000 * 60 * 60); execution.result = { success: false, error: error instanceof Error ? error.message : String(error) }; // Update status with proper synchronization this.updateExecutionStatus(execution, 'failed'); // Call lifecycle hook for execution failure await this.callLifecycleHook('onExecutionFailed', execution, error instanceof Error ? error : new Error(String(error))); // Log the error but don't re-throw to allow the execution to complete with failed status logger.error('Task execution failed', { executionId: execution.metadata.executionId, taskId: execution.scheduledTask.task.id, error: error instanceof Error ? error.message : String(error) }); } finally { // Release resource locks await this.releaseExecutionLocks(execution.metadata.executionId); // End performance tracking if (this.performanceMonitor) { const duration = this.performanceMonitor.endOperation(operationId, { taskId: execution.scheduledTask.task.id, agentId: execution.agent.id, status: execution.status, success: (execution.status as ExecutionStatus) === 'completed' }); // Log performance if it exceeds target if (duration > 50) { // Epic 6.2 target logger.warn('Task execution exceeded performance target', { operationId, duration, target: 50, taskId: execution.scheduledTask.task.id }); } } // Update agent status this.updateAgentAfterTaskCompletion(execution.agent, execution); } } /** * Execute task using real agent communication via UniversalAgentCommunicationChannel */ private async executeTaskWithAgent(execution: TaskExecution): Promise<{ success: boolean; output?: string; exitCode?: number; }> { const task = execution.scheduledTask.task; const agent = execution.agent; logger.info({ taskId: task.id, agentId: agent.id, executionId: execution.metadata.executionId }, 'Starting real task execution with agent'); try { // Import UniversalAgentCommunicationChannel dynamically to avoid circular dependencies const { AgentOrchestrator } = await import('./agent-orchestrator.js'); const orchestrator = AgentOrchestrator.getInstance(); // Get the communication channel from orchestrator using public method const communicationChannel = orchestrator.getCommunicationChannel(); if (!communicationChannel) { throw new Error('Communication channel not available'); } // Prepare task payload for agent const taskPayload = JSON.stringify({ taskId: task.id, title: task.title, description: task.description, type: task.type, priority: task.priority, estimatedHours: task.estimatedHours, acceptanceCriteria: task.acceptanceCriteria, tags: task.tags, projectId: task.projectId, dependencies: task.dependencies, executionId: execution.metadata.executionId, timestamp: Date.now() }); // Send task to agent via universal communication channel const taskSent = await communicationChannel.sendTask(agent.id, taskPayload); if (!taskSent) { throw new Error(`Failed to send task to agent ${agent.id}`); } logger.info({ taskId: task.id, agentId: agent.id, executionId: execution.metadata.executionId }, 'Task sent to agent successfully'); // Apply additional delay before waiting for response (for testing) await this.applyExecutionDelay(execution.metadata.executionId); // Check if execution is paused before waiting for response await this.waitForExecutionUnpause(execution.metadata.executionId); // Wait for agent response with timeout const timeoutMs = this.config.taskTimeoutMinutes * 60 * 1000; const response = await this.waitForAgentResponse(execution, timeoutMs); if (!response) { throw new Error(`Agent ${agent.id} did not respond within timeout`); } // Parse agent response const result = this.parseAgentResponse(response); logger.info({ taskId: task.id, agentId: agent.id, executionId: execution.metadata.executionId, success: result.success }, 'Task execution completed with agent response'); return result; } catch (error) { logger.error({ err: error, taskId: task.id, agentId: agent.id, executionId: execution.metadata.executionId }, 'Task execution failed with agent'); throw error; } } /** * Wait for agent response with timeout and polling */ private async waitForAgentResponse(execution: TaskExecution, timeoutMs: number): Promise<string | null> { const startTime = Date.now(); const pollInterval = 5000; // Poll every 5 seconds const agent = execution.agent; logger.debug({ agentId: agent.id, executionId: execution.metadata.executionId, timeoutMs }, 'Waiting for agent response'); try { // Import communication channel const { AgentOrchestrator } = await import('./agent-orchestrator.js'); const orchestrator = AgentOrchestrator.getInstance(); const communicationChannel = orchestrator.getCommunicationChannel(); while (Date.now() - startTime < timeoutMs) { // Check if execution was cancelled if (execution.status === 'cancelled') { logger.info({ agentId: agent.id, executionId: execution.metadata.executionId }, 'Task execution was cancelled while waiting for response'); return null; } // Try to receive response from agent try { const response = await communicationChannel.receiveResponse(agent.id, pollInterval); if (response) { logger.debug({ agentId: agent.id, executionId: execution.metadata.executionId, responseLength: response.length }, 'Received response from agent'); return response; } } catch (error) { // Continue polling on receive errors logger.debug({ err: error, agentId: agent.id, executionId: execution.metadata.executionId }, 'No response received, continuing to poll'); } // Wait before next poll await new Promise(resolve => setTimeout(resolve, pollInterval)); } logger.warn({ agentId: agent.id, executionId: execution.metadata.executionId, elapsedMs: Date.now() - startTime }, 'Agent response timeout'); return null; } catch (error) { logger.error({ err: error, agentId: agent.id, executionId: execution.metadata.executionId }, 'Error while waiting for agent response'); return null; } } /** * Parse agent response into execution result */ private parseAgentResponse(response: string): { success: boolean; output?: string; exitCode?: number; } { try { // Try to parse as JSON first const parsed = JSON.parse(response); if (typeof parsed === 'object' && parsed !== null) { return { success: parsed.success === true, output: parsed.output || parsed.message || response, exitCode: parsed.exitCode || (parsed.success ? 0 : 1) }; } } catch { // Not JSON, treat as plain text logger.debug({ response: response.substring(0, 100) }, 'Agent response is not JSON, treating as plain text'); } // Default parsing for plain text responses const success = !response.toLowerCase().includes('error') && !response.toLowerCase().includes('failed') && !response.toLowerCase().includes('failure'); return { success, output: response, exitCode: success ? 0 : 1 }; } /** * Select best agent for a task */ private selectAgent(scheduledTask: ScheduledTask): Agent | null { const availableAgents = Array.from(this.agents.values()) .filter(agent => agent.status === 'idle' || agent.status === 'busy'); if (availableAgents.length === 0) { return null; } switch (this.config.loadBalancingStrategy) { case 'round_robin': return this.selectAgentRoundRobin(availableAgents); case 'least_loaded': return this.selectAgentLeastLoaded(availableAgents); case 'resource_aware': return this.selectAgentResourceAware(availableAgents, scheduledTask); case 'priority_based': return this.selectAgentPriorityBased(availableAgents, scheduledTask); default: return availableAgents[0]; } } // Agent selection strategies private selectAgentRoundRobin(agents: Agent[]): Agent { // Simple round-robin selection const index = Date.now() % agents.length; return agents[index]; } private selectAgentLeastLoaded(agents: Agent[]): Agent { return agents.reduce((least, current) => current.currentUsage.activeTasks < least.currentUsage.activeTasks ? current : least ); } private selectAgentResourceAware(agents: Agent[], scheduledTask: ScheduledTask): Agent { const requiredMemory = scheduledTask.assignedResources.memoryMB; const requiredCpu = scheduledTask.assignedResources.cpuWeight; // Filter agents that can handle the resource requirements const capableAgents = agents.filter(agent => (agent.capacity.maxMemoryMB - agent.currentUsage.memoryMB) >= requiredMemory && (agent.capacity.maxCpuWeight - agent.currentUsage.cpuWeight) >= requiredCpu && agent.currentUsage.activeTasks < agent.capacity.maxConcurrentTasks ); if (capableAgents.length === 0) { return agents[0]; // Fallback to first available } // Select agent with most available resources return capableAgents.reduce((best, current) => { const bestAvailable = (best.capacity.maxMemoryMB - best.currentUsage.memoryMB) + (best.capacity.maxCpuWeight - best.currentUsage.cpuWeight); const currentAvailable = (current.capacity.maxMemoryMB - current.currentUsage.memoryMB) + (current.capacity.maxCpuWeight - current.currentUsage.cpuWeight); return currentAvailable > bestAvailable ? current : best; }); } private selectAgentPriorityBased(agents: Agent[], scheduledTask: ScheduledTask): Agent { const taskPriority = scheduledTask.metadata.priorityScore; // For high priority tasks, prefer agents with better performance if (taskPriority > 0.8) { return agents.reduce((best, current) => current.metadata.successRate > best.metadata.successRate ? current : best ); } // For normal tasks, use least loaded strategy return this.selectAgentLeastLoaded(agents); } // Resource management methods private calculateBatchResourceAllocation(scheduledTasks: ScheduledTask[]): { totalMemoryMB: number; totalCpuWeight: number; agentCount: number; } { const totalMemoryMB = scheduledTasks.reduce((sum, task) => sum + task.assignedResources.memoryMB, 0); const totalCpuWeight = scheduledTasks.reduce((sum, task) => sum + task.assignedResources.cpuWeight, 0); const agentCount = new Set(scheduledTasks.map(task => task.assignedResources.agentId)).size; return { totalMemoryMB, totalCpuWeight, agentCount }; } private canExecuteBatch(executionBatch: ExecutionBatch): boolean { const { totalMemoryMB, totalCpuWeight, agentCount } = executionBatch.resourceAllocation; // Get agents that can actually handle tasks (not offline) const activeAgents = Array.from(this.agents.values()) .filter(agent => agent.status !== 'offline'); if (activeAgents.length === 0) { logger.debug('No active agents available for batch execution'); return false; } // Check if we have enough agents for the batch if (agentCount > activeAgents.length) { logger.debug('Insufficient agent count', { required: agentCount, available: activeAgents.length }); return false; } // Check individual agent capacity constraints const scheduledTasks = this.getScheduledTasksForBatch(executionBatch); const agentAssignments = this.simulateAgentAssignments(scheduledTasks, activeAgents); if (!agentAssignments) { logger.debug('Cannot assign tasks to agents due to capacity constraints'); return false; } // Check total resource availability as a final validation const totalCapacity = activeAgents.reduce((sum, agent) => ({ memory: sum.memory + agent.capacity.maxMemoryMB, cpu: sum.cpu + agent.capacity.maxCpuWeight }), { memory: 0, cpu: 0 }); const currentUsage = activeAgents.reduce((sum, agent) => ({ memory: sum.memory + agent.currentUsage.memoryMB, cpu: sum.cpu + agent.currentUsage.cpuWeight }), { memory: 0, cpu: 0 }); const availableMemory = totalCapacity.memory - currentUsage.memory; const availableCpu = totalCapacity.cpu - currentUsage.cpu; const hasResources = availableMemory >= totalMemoryMB && availableCpu >= totalCpuWeight; if (!hasResources) { logger.debug('Insufficient total resources', { requiredMemory: totalMemoryMB, availableMemory, requiredCpu: totalCpuWeight, availableCpu }); } return hasResources; } /** * Get scheduled tasks for a batch */ private getScheduledTasksForBatch(executionBatch: ExecutionBatch): ScheduledTask[] { const currentSchedule = this.taskScheduler.getCurrentSchedule(); if (!currentSchedule) { return []; } return executionBatch.parallelBatch.taskIds .map(taskId => currentSchedule.scheduledTasks.get(taskId)) .filter(task => task !== undefined) as ScheduledTask[]; } /** * Simulate agent assignments to check if batch is feasible */ private simulateAgentAssignments(scheduledTasks: ScheduledTask[], agents: Agent[]): boolean { // Create a copy of agent states for simulation const agentStates = agents.map(agent => ({ id: agent.id, availableMemory: agent.capacity.maxMemoryMB - agent.currentUsage.memoryMB, availableCpu: agent.capacity.maxCpuWeight - agent.currentUsage.cpuWeight, availableSlots: agent.capacity.maxConcurrentTasks - agent.currentUsage.activeTasks })); // Try to assign each task to an agent for (const task of scheduledTasks) { const requiredMemory = task.assignedResources.memoryMB; const requiredCpu = task.assignedResources.cpuWeight; // Find an agent that can handle this task const suitableAgent = agentStates.find(agent => agent.availableMemory >= requiredMemory && agent.availableCpu >= requiredCpu && agent.availableSlots > 0 ); if (!suitableAgent) { // Cannot assign this task to any agent return false; } // "Assign" the task to this agent (update simulation state) suitableAgent.availableMemory -= requiredMemory; suitableAgent.availableCpu -= requiredCpu; suitableAgent.availableSlots -= 1; } return true; // All tasks can be assigned } // Agent status management private updateAgentBeforeTaskExecution(agent: Agent, execution: TaskExecution): void { agent.status = 'busy'; agent.currentUsage.memoryMB += execution.scheduledTask.assignedResources.memoryMB; agent.currentUsage.cpuWeight += execution.scheduledTask.assignedResources.cpuWeight; agent.currentUsage.activeTasks++; agent.metadata.lastHeartbeat = new Date(); } private updateAgentAfterTaskCompletion(agent: Agent, execution: TaskExecution): void { agent.currentUsage.memoryMB = Math.max(0, agent.currentUsage.memoryMB - execution.scheduledTask.assignedResources.memoryMB); agent.currentUsage.cpuWeight = Math.max(0, agent.currentUsage.cpuWeight - execution.scheduledTask.assignedResources.cpuWeight); agent.currentUsage.activeTasks = Math.max(0, agent.currentUsage.activeTasks - 1); // Update agent statistics agent.metadata.totalTasksExecuted++; if (execution.actualDuration) { const currentAvg = agent.metadata.averageExecutionTime; const totalTasks = agent.metadata.totalTasksExecuted; agent.metadata.averageExecutionTime = (currentAvg * (totalTasks - 1) + execution.actualDuration) / totalTasks; } // Update success rate const wasSuccessful = execution.status === 'completed'; const currentRate = agent.metadata.successRate; const totalTasks = agent.metadata.totalTasksExecuted; agent.metadata.successRate = (currentRate * (totalTasks - 1) + (wasSuccessful ? 1 : 0)) / totalTasks; // Update agent status agent.status = agent.currentUsage.activeTasks > 0 ? 'busy' : 'idle'; agent.metadata.lastHeartbeat = new Date(); } // Monitoring and utility methods private async updateExecutionResourceUsage(execution: TaskExecution): Promise<void> { // In real implementation, this would query the agent for current resource usage // For simulation, we'll use the assigned resources execution.resourceUsage.peakMemoryMB = Math.max( execution.resourceUsage.peakMemoryMB, execution.scheduledTask.assignedResources.memoryMB ); execution.resourceUsage.averageCpuWeight = execution.scheduledTask.assignedResources.cpuWeight; } private async isExecutionComplete(execution: TaskExecution): Promise<boolean> { // In real implementation, this would check with the agent // For simulation, we'll check if the execution has a result return execution.result !== undefined; } private async completeExecution(execution: TaskExecution): Promise<void> { execution.status = execution.result?.success ? 'completed' : 'failed'; execution.endTime = new Date(); if (execution.endTime && execution.startTime) { execution.actualDuration = (execution.endTime.getTime() - execution.startTime.getTime()) / (1000 * 60 * 60); } // Release resource locks await this.releaseExecutionLocks(execution.metadata.executionId); // Remove from active executions this.activeExecutions.delete(execution.metadata.executionId); logger.info('Execution completed', { executionId: execution.metadata.executionId, taskId: execution.scheduledTask.task.id, status: execution.status, duration: execution.actualDuration }); } private async signalCancellation(execution: TaskExecution): Promise<void> { // In real implementation, this would send cancellation signal to agent logger.info('Signaling cancellation to agent', { executionId: execution.metadata.executionId, agentId: execution.agent.id }); } private async cancelAllExecutions(): Promise<void> { const activeExecutionIds = Array.from(this.activeExecutions.keys()); for (const executionId of activeExecutionIds) { await this.cancelExecution(executionId); } } private cancelTasksOnAgent(agentId: string): void { const agentExecutions = Array.from(this.activeExecutions.values()) .filter(execution => execution.agent.id === agentId); for (const execution of agentExecutions) { execution.status = 'cancelled'; execution.endTime = new Date(); this.activeExecutions.delete(execution.metadata.executionId); } } private monitorResources(): void { // Update agent heartbeats and detect offline agents const now = new Date(); const heartbeatTimeoutMs = this.config.agentHeartbeatInterval * 2 * 1000; for (const agent of this.agents.values()) { const timeSinceHeartbeat = now.getTime() - agent.metadata.lastHeartbeat.getTime(); if (timeSinceHeartbeat > heartbeatTimeoutMs && agent.status !== 'offline') { logger.warn('Agent heartbeat timeout', { agentId: agent.id, timeSinceHeartbeat: timeSinceHeartbeat / 1000 }); agent.status = 'offline'; this.cancelTasksOnAgent(agent.id); } } } // Metrics calculation methods private calculateTasksPerHour(): number { // Simple calculation based on recent completions const oneHourAgo = new Date(Date.now() - 60 * 60 * 1000); const recentCompletions = Array.from(this.activeExecutions.values()) .filter(e => e.status === 'completed' && e.endTime && e.endTime > oneHourAgo); return recentCompletions.length; } private calculateBatchesPerHour(): number { // Simple calculation based on recent batch completions const oneHourAgo = new Date(Date.now() - 60 * 60 * 1000); const recentBatches = Array.from(this.executionBatches.values()) .filter(b => b.status === 'completed' && b.endTime && b.endTime > oneHourAgo); return recentBatches.length; } private calculateParallelismFactor(): number { const runningTasks = Array.from(this.activeExecutions.values()) .filter(e => e.status === 'running').length; const totalAgents = this.agents.size; return totalAgents > 0 ? runningTasks / totalAgents : 0; } /** * Get human-readable status message for an execution */ private getExecutionStatusMessage(execution: TaskExecution): string { const { status, startTime, endTime, result, metadata } = execution; const now = new Date(); switch (status) { case 'queued': return 'Task is queued for execution'; case 'running': { const elapsedMs = now.getTime() - startTime.getTime(); const elapsedMinutes = Math.round(elapsedMs / (1000 * 60)); return `Task is running (${elapsedMinutes} minutes elapsed)`; } case 'completed': { const duration = endTime ? Math.round((endTime.getTime() - startTime.getTime()) / (1000 * 60)) : 0; return `Task completed successfully in ${duration} minutes`; } case 'failed': { const failureReason = result?.error || 'Unknown error'; const retryInfo = metadata.retryCount > 0 ? ` (${metadata.retryCount} retries)` : ''; return `Task failed: ${failureReason}${retryInfo}`; } case 'cancelled': return 'Task was cancelled'; case 'timeout': { const timeoutInfo = metadata.timeoutCount > 0 ? ` (${metadata.timeoutCount} timeouts)` : ''; return `Task timed out${timeoutInfo}`; } default: return `Task status: ${status}`; } } /** * Optimize batch processing for better performance */ async optimizeBatchProcessing(): Promise<void> { logger.info('Starting batch processing optimization'); try { // Optimize queue processing await this.optimizeExecutionQueue(); // Optimize agent utilization await this.optimizeAgentUtilization(); // Clean up completed executions await this.cleanupCompletedExecutions(); logger.info('Batch processing optimization completed'); } catch (error) { logger.error({ err: error }, 'Batch processing optimization failed'); throw error; } } /** * Optimize execution queue processing */ private async optimizeExecutionQueue(): Promise<void> { if (this.executionQueue.length === 0) { return; } // Sort queue by priority and estimated duration this.executionQueue.sort((a, b) => { const priorityWeight = this.getPriorityWeight(a.task.priority) - this.getPriorityWeight(b.task.priority); if (priorityWeight !== 0) return priorityWeight; // If same priority, prefer shorter tasks return a.task.estimatedHours - b.task.estimatedHours; }); // Group similar tasks for batch processing const taskGroups = this.groupSimilarTasks(this.executionQueue); // Process groups in optimal order for (const group of taskGroups) { if (group.length > 1) { logger.debug({ groupSize: group.length, taskType: group[0].task.type }, 'Processing task group'); } } logger.debug({ queueSize: this.executionQueue.length, groups: taskGroups.length }, 'Execution queue optimized'); } /** * Optimize agent utilization */ private async optimizeAgentUtilization(): Promise<void> { const idleAgents = Array.from(this.agents.values()).filter(agent => agent.status === 'idle'); const busyAgents = Array.from(this.agents.values()).filter(agent => agent.status === 'busy'); // Rebalance tasks if some agents are overloaded for (const busyAgent of busyAgents) { if (busyAgent.currentUsage.activeTasks > busyAgent.capacity.maxConcurrentTasks * 0.8 && idleAgents.length > 0) { // Try to redistribute some tasks const redistributableTasks = Math.floor(busyAgent.currentUsage.activeTasks * 0.3); for (let i = 0; i < redistributableTasks && idleAgents.length > 0; i++) { const execution = Array.from(this.activeExecutions.values()) .find(exec => exec.agent.id === busyAgent.id && exec.status === 'queued'); if (execution) { // Reassign to idle agent const idleAgent = idleAgents.shift(); if (idleAgent) { execution.agent = idleAgent; busyAgent.currentUsage.activeTasks--; idleAgent.currentUsage.activeTasks++; idleAgent.status = 'busy'; logger.debug({ taskId: execution.scheduledTask.task.id, fromAgent: busyAgent.id, toAgent: idleAgent.id }, 'Task redistributed for load balancing'); } } } } } logger.debug({ idleAgents: idleAgents.length, busyAgents: busyAgents.length }, 'Agent utilization optimized'); } /** * Clean up completed executions to free memory */ private async cleanupCompletedExecutions(): Promise<void> { const cutoffTime = Date.now() - (60 * 60 * 1000); // 1 hour ago let cleanedCount = 0; // Clean up old completed executions for (const [executionId, execution] of this.activeExecutions.entries()) { if ((execution.status === 'completed' || execution.status === 'failed') && execution.endTime && execution.endTime.getTime() < cutoffTime) { this.activeExecutions.delete(executionId); cleanedCount++; } } // Clean up old execution batches for (const [batchId, batch] of this.executionBatches.entries()) { if ((batch.status === 'completed' || batch.status === 'failed') && batch.endTime && batch.endTime.getTime() < cutoffTime) { this.executionBatches.delete(batchId); cleanedCount++; } } logger.debug({ cleanedCount }, 'Completed executions cleaned up'); } /** * Group similar tasks for batch processing */ private groupSimilarTasks(tasks: ScheduledTask[]): ScheduledTask[][] { const groups = new Map<string, ScheduledTask[]>(); for (const task of tasks) { const groupKey = `${task.task.type}_${task.task.priority}`; if (!groups.has(groupKey)) { groups.set(groupKey, []); } groups.get(groupKey)!.push(task); } return Array.from(groups.values()); } /** * Get priority weight for sorting */ private getPriorityWeight(priority: string): number { switch (priority) { case 'critical': return 0; case 'high': return 1; case 'medium': return 2; case 'low': return 3; default: return 4; } } // Execution State Synchronization Methods /** * Register a callback for execution state changes */ onExecutionStateChange(callback: ExecutionStateChangeCallback): void { this.stateChangeCallbacks.push(callback); } /** * Remove a callback for execution state changes */ removeExecutionStateChangeCallback(callback: ExecutionStateChangeCallback): void { const index = this.stateChangeCallbacks.indexOf(callback); if (index > -1) { this.stateChangeCallbacks.splice(index, 1); } } /** * Set execution lifecycle hooks */ setLifecycleHooks(hooks: ExecutionLifecycleHooks): void { this.lifecycleHooks = { ...hooks }; } /** * Clear all lifecycle hooks */ clearLifecycleHooks(): void { this.lifecycleHooks = {}; } /** * Call a lifecycle hook if it exists */ private async callLifecycleHook( hookName: keyof ExecutionLifecycleHooks, execution: TaskExecution, error?: Error ): Promise<void> { const hook = this.lifecycleHooks[hookName]; if (!hook) { return; } try { if (hookName === 'onExecutionFailed' && error) { await (hook as (execution: TaskExecution, error: Error) => Promise<void>)(execution, error); } else if (hookName === 'onExecutionProgress') { // Calculate progress based on execution time vs estimated time const elapsed = Date.now() - execution.startTime.getTime(); const estimated = (execution.scheduledTask.task.estimatedHours || 1) * 60 * 60 * 1000; const progress = Math.min(elapsed / estimated, 1.0); await (hook as ExecutionLifecycleHooks['onExecutionProgress'])?.(execution, progress); } else if (hookName === 'onExecutionFailed') { // onExecutionFailed requires an error parameter - this should be called with the actual error // For now, we'll skip this hook if no error is provided return; } else if (hookName === 'onExecutionStart' || hookName === 'onExecutionComplete' || hookName === 'onExecutionCancelled') { await (hook as (execution: TaskExecution) => Promise<void> | void)(execution); } } catch (hookError) { logger.warn('Error in lifecycle hook', { hookName, executionId: execution.metadata.executionId, error: hookError instanceof Error ? hookError.message : String(hookError) }); } } /** * Notify all callbacks of execution state change */ private notifyExecutionStateChange( execution: TaskExecution, previousStatus: ExecutionStatus, newStatus: ExecutionStatus ): void { if (!this.config.enableExecutionStateEvents) { return; } // Update internal state tracking this.executionStateSync.set(execution.metadata.executionId, newStatus); const event: ExecutionStateChangeEvent = { executionId: execution.metadata.executionId, taskId: execution.scheduledTask.task.id, agentId: execution.agent.id, previousStatus, newStatus, timestamp: new Date(), metadata: { retryCount: execution.metadata.retryCount, timeoutCount: execution.metadata.timeoutCount } }; // Notify all registered callbacks for (const callback of this.stateChangeCallbacks) { try { callback(event); } catch (error) { logger.warn('Error in execution state change callback', { executionId: execution.metadata.executionId, error: error instanceof Error ? error.message : String(error) }); } } logger.debug('Execution state changed', { executionId: execution.metadata.executionId, taskId: execution.scheduledTask.task.id, previousStatus, newStatus, callbackCount: this.stateChangeCallbacks.length }); } /** * Update execution status with proper synchronization */ private updateExecutionStatus(execution: TaskExecution, newStatus: ExecutionStatus): void { const previousStatus = execution.status; execution.status = newStatus; // Notify state change this.notifyExecutionStateChange(execution, previousStatus, newStatus); // Handle completion with retention if (newStatus === 'completed' || newStatus === 'failed' || newStatus === 'cancelled') { this.handleExecutionCompletion(execution); } } /** * Handle execution completion with retention */ private handleExecutionCompletion(execution: TaskExecution): void { // Move to completed executions for retention this.completedExecutions.set(execution.metadata.executionId, execution); // Remove from active executions after a short delay to allow state tracking setTimeout(() => { this.activeExecutions.delete(execution.metadata.executionId); }, 1000); // 1 second delay // Schedule cleanup based on retention policy setTimeout(() => { this.completedExecutions.delete(execution.metadata.executionId); }, this.config.executionRetentionMinutes * 60 * 1000); } /** * Get execution by ID (checks both active and completed) */ getExecution(executionId: string): TaskExecution | undefined { return this.activeExecutions.get(executionId) || this.completedExecutions.get(executionId); } /** * Get all executions (active and recently completed) */ getAllExecutions(): TaskExecution[] { return [ ...Array.from(this.activeExecutions.values()), ...Array.from(this.completedExecutions.values()) ]; } /** * Get execution state synchronization status */ getExecutionSyncStatus(): { activeCount: number; completedCount: number; syncedCount: number; callbackCount: number; } { return { activeCount: this.activeExecutions.size, completedCount: this.completedExecutions.size, syncedCount: this.executionStateSync.size, callbackCount: this.stateChangeCallbacks.length }; } // Execution Delay Control Methods /** * Set execution delay for a specific execution */ setExecutionDelay(executionId: string, delayMs: number): void { this.executionDelays.set(executionId, delayMs); logger.debug('Execution delay set', { executionId, delayMs }); } /** * Pause an execution (prevents it from completing) */ pauseExecution(executionId: string): void { this.executionPauses.set(executionId, true); logger.debug('Execution paused', { executionId }); } /** * Resume a paused execution */ resumeExecution(executionId: string): void { this.executionPauses.set(executionId, false); logger.debug('Execution resumed', { executionId }); } /** * Check if execution is paused */ isExecutionPaused(executionId: string): boolean { return this.executionPauses.get(executionId) === true; } /** * Clear all execution delays and pauses */ clearExecutionControls(): void { this.executionDelays.clear(); this.executionPauses.clear(); logger.debug('All execution controls cleared'); } /** * Apply execution delay if configured */ private async applyExecutionDelay(executionId: string): Promise<void> { if (!this.config.enableExecutionDelays) { return; } // Check for specific delay for this execution let delayMs = this.executionDelays.get(executionId); // Use default delay if no specific delay set if (delayMs === undefined) { delayMs = this.config.defaultExecutionDelayMs; } if (delayMs > 0) { logger.debug('Applying execution delay', { executionId, delayMs }); await new Promise(resolve => setTimeout(resolve, delayMs)); } } /** * Wait for execution to be unpaused */ private async waitForExecutionUnpause(executionId: string): Promise<void> { while (this.isExecutionPaused(executionId)) { logger.debug('Execution is paused, waiting...', { executionId }); await new Promise(resolve => setTimeout(resolve, 100)); // Check every 100ms } } }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/freshtechbro/vibe-coder-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server