Skip to main content
Glama
portel-dev

NCP - Natural Context Provider

by portel-dev
timing-executor.ts14.1 kB
/** * Timing Executor - Execute all active tasks for a timing group in parallel * Executes multiple tasks concurrently using isolated child processes for safety */ import { TaskManager } from './task-manager.js'; import { ExecutionRecorder } from './execution-recorder.js'; import { logger } from '../../utils/logger.js'; import { v4 as uuidv4 } from 'uuid'; import { ScheduledTask } from '../../types/scheduler.js'; import { spawn } from 'child_process'; import { execSync } from 'child_process'; export interface TaskExecutionResult { taskId: string; taskName: string; executionId: string; status: 'success' | 'failure' | 'timeout'; result?: any; error?: string; duration: number; } export interface TimingExecutionSummary { timingId: string; totalTasks: number; activeTasks: number; executedTasks: number; successfulTasks: number; failedTasks: number; skippedTasks: number; results: TaskExecutionResult[]; totalDuration: number; } export class TimingExecutor { private taskManager: TaskManager; private executionRecorder: ExecutionRecorder; private defaultTimeout: number = 5 * 60 * 1000; // 5 minutes constructor() { this.taskManager = new TaskManager(); this.executionRecorder = new ExecutionRecorder(); } /** * Execute a single task immediately */ async executeSingleTask(taskId: string, timeout?: number): Promise<TaskExecutionResult> { const task = this.taskManager.getTask(taskId); if (!task) { throw new Error(`Task ${taskId} not found`); } logger.info(`[TimingExecutor] Executing single task: ${task.name} (${task.id})`); // Execute in child process for isolation return this.executeTaskInChildProcess(task, timeout); } /** * Execute all active tasks for a timing group in parallel */ async executeTimingGroup(timingId: string, timeout?: number): Promise<TimingExecutionSummary> { const startTime = Date.now(); logger.info(`[TimingExecutor] Starting execution for timing group: ${timingId}`); // Load timing group const timing = this.taskManager.getTimingGroup(timingId); if (!timing) { logger.error(`[TimingExecutor] Timing group ${timingId} not found`); return { timingId, totalTasks: 0, activeTasks: 0, executedTasks: 0, successfulTasks: 0, failedTasks: 0, skippedTasks: 0, results: [], totalDuration: Date.now() - startTime }; } // Get all tasks for this timing const allTasks = this.taskManager.getTasksForTiming(timingId); const activeTasks = allTasks.filter(task => task.status === 'active'); logger.info(`[TimingExecutor] Timing ${timing.name}: ${activeTasks.length} active tasks out of ${allTasks.length} total`); if (activeTasks.length === 0) { logger.info(`[TimingExecutor] No active tasks to execute for timing ${timingId}`); return { timingId, totalTasks: allTasks.length, activeTasks: 0, executedTasks: 0, successfulTasks: 0, failedTasks: 0, skippedTasks: allTasks.length, results: [], totalDuration: Date.now() - startTime }; } // Execute all active tasks in parallel using ISOLATED child processes // This ensures if one task crashes, it doesn't affect other tasks logger.info(`[TimingExecutor] Executing ${activeTasks.length} tasks in parallel (isolated processes)...`); const executionPromises = activeTasks.map(task => this.executeTaskInChildProcess(task, timeout).catch(error => ({ taskId: task.id, taskName: task.name, executionId: uuidv4(), status: 'failure' as const, error: error instanceof Error ? error.message : String(error), duration: 0 })) ); const results = await Promise.allSettled(executionPromises); // Process results const taskResults: TaskExecutionResult[] = []; let successfulTasks = 0; let failedTasks = 0; for (const result of results) { if (result.status === 'fulfilled') { taskResults.push(result.value); if (result.value.status === 'success') { successfulTasks++; } else { failedTasks++; } } else { // This shouldn't happen due to catch above, but handle it anyway failedTasks++; logger.error(`[TimingExecutor] Unexpected promise rejection: ${result.reason}`); } } const totalDuration = Date.now() - startTime; logger.info( `[TimingExecutor] Timing ${timingId} execution completed: ` + `${successfulTasks} successful, ${failedTasks} failed, ` + `${allTasks.length - activeTasks.length} skipped (${totalDuration}ms)` ); return { timingId, totalTasks: allTasks.length, activeTasks: activeTasks.length, executedTasks: taskResults.length, successfulTasks, failedTasks, skippedTasks: allTasks.length - activeTasks.length, results: taskResults, totalDuration }; } /** * Get the path to ncp executable */ private getNCPExecutablePath(): string { try { const ncpPath = execSync('which ncp', { encoding: 'utf-8' }).trim(); return ncpPath; } catch { return 'npx ncp'; } } /** * Execute a single task in an isolated child process * This ensures if one task crashes, it doesn't affect other tasks */ private async executeTaskInChildProcess(task: ScheduledTask, timeout?: number): Promise<TaskExecutionResult> { const startTime = Date.now(); const executionTimeout = timeout || this.defaultTimeout; logger.info(`[TimingExecutor] Starting task ${task.name} (${task.id}) in isolated child process`); return new Promise((resolve) => { const ncpPath = this.getNCPExecutablePath(); // Spawn child process to execute the task // This provides complete isolation - if the task crashes, only this process dies const child = spawn(ncpPath, ['_task-execute', task.id], { stdio: ['ignore', 'pipe', 'pipe'], detached: false, env: { ...process.env, NCP_TASK_EXECUTION: 'true' // Flag to indicate this is task execution } }); let stdout = ''; let stderr = ''; let killed = false; // Set up timeout const timeoutHandle = setTimeout(() => { killed = true; child.kill('SIGTERM'); // Force kill after 5 seconds if still running setTimeout(() => { if (!child.killed) { child.kill('SIGKILL'); } }, 5000); }, executionTimeout); // Capture output child.stdout?.on('data', (data) => { stdout += data.toString(); }); child.stderr?.on('data', (data) => { stderr += data.toString(); }); // Handle process exit child.on('exit', (code, signal) => { clearTimeout(timeoutHandle); const duration = Date.now() - startTime; if (killed) { logger.warn(`[TimingExecutor] Task ${task.name} timed out after ${executionTimeout}ms`); resolve({ taskId: task.id, taskName: task.name, executionId: uuidv4(), status: 'timeout', error: `Task execution timeout after ${executionTimeout}ms`, duration }); } else if (code === 0) { logger.info(`[TimingExecutor] Task ${task.name} completed successfully (${duration}ms)`); resolve({ taskId: task.id, taskName: task.name, executionId: uuidv4(), status: 'success', result: stdout, duration }); } else { const errorMsg = stderr || `Process exited with code ${code}` + (signal ? ` (signal: ${signal})` : ''); logger.error(`[TimingExecutor] Task ${task.name} failed: ${errorMsg}`); resolve({ taskId: task.id, taskName: task.name, executionId: uuidv4(), status: 'failure', error: errorMsg, duration }); } }); // Handle spawn errors child.on('error', (error) => { clearTimeout(timeoutHandle); const duration = Date.now() - startTime; logger.error(`[TimingExecutor] Failed to spawn process for task ${task.name}: ${error.message}`); resolve({ taskId: task.id, taskName: task.name, executionId: uuidv4(), status: 'failure', error: `Failed to spawn process: ${error.message}`, duration }); }); }); } /** * Execute a single task (in-process - legacy, kept for backward compatibility) * @deprecated Use executeTaskInChildProcess for better isolation */ private async executeTask(task: ScheduledTask, timeout?: number): Promise<TaskExecutionResult> { const startTime = Date.now(); const executionId = uuidv4(); const executionTimeout = timeout || this.defaultTimeout; logger.info(`[TimingExecutor] Starting execution ${executionId} for task ${task.name} (${task.id})`); // Start execution recording this.executionRecorder.startExecution({ executionId, jobId: task.id, // Use taskId for backward compatibility with execution recorder jobName: task.name, tool: task.tool, parameters: task.parameters, startedAt: new Date().toISOString(), status: 'running' }); // Track original working directory for restoration const originalCwd = process.cwd(); try { logger.info(`[TimingExecutor] Executing ${task.tool} with parameters: ${JSON.stringify(task.parameters)}`); // Change to home directory to use global ~/.ncp config const { homedir } = await import('os'); const homeDirectory = homedir(); if (homeDirectory !== originalCwd) { process.chdir(homeDirectory); logger.debug(`[TimingExecutor] Changed working directory from ${originalCwd} to ${homeDirectory}`); } // Extract MCP name from tool (e.g., "apple-mcp" from "apple-mcp:mail") const mcpName = task.tool.includes(':') ? task.tool.split(':')[0] : null; logger.debug(`[TimingExecutor] Tool: ${task.tool}, MCP: ${mcpName || 'none'}`); // Create orchestrator with specific MCP profile const { NCPOrchestrator } = await import('../../orchestrator/ncp-orchestrator.js'); const orchestrator = new NCPOrchestrator(mcpName || 'all', true); // Silent mode // Initialize and wait for background init to complete (with timeout) await orchestrator.initialize(); await this.executeWithTimeout( () => orchestrator.waitForInitialization(), 30000 ); // Execute tool with timeout const execResult = await this.executeWithTimeout<any>( () => orchestrator.run(task.tool, task.parameters), executionTimeout ); // Cleanup orchestrator await orchestrator.cleanup(); // Restore original working directory process.chdir(originalCwd); if (!execResult.success) { throw new Error(execResult.error || 'Tool execution failed'); } const result = execResult.content; // Record successful execution const duration = Date.now() - startTime; this.executionRecorder.completeExecution(executionId, 'success', result); // Update task metadata this.taskManager.recordExecution(task.id, executionId, new Date().toISOString()); logger.info(`[TimingExecutor] Task ${task.name} execution ${executionId} completed successfully in ${duration}ms`); return { taskId: task.id, taskName: task.name, executionId, status: 'success', result, duration }; } catch (error) { const duration = Date.now() - startTime; const isTimeout = error instanceof Error && error.message.includes('timeout'); const status = isTimeout ? 'timeout' : 'failure'; const errorMessage = error instanceof Error ? error.message : String(error); logger.error(`[TimingExecutor] Task ${task.name} execution ${executionId} failed: ${errorMessage}`); // Restore original working directory try { process.chdir(originalCwd); } catch (chdirError) { logger.warn(`[TimingExecutor] Failed to restore working directory: ${chdirError}`); } // Record failed execution this.executionRecorder.completeExecution( executionId, status, undefined, { message: errorMessage } ); // Mark task as errored if it's not a timeout if (!isTimeout) { this.taskManager.markTaskAsErrored(task.id, errorMessage); } return { taskId: task.id, taskName: task.name, executionId, status, error: errorMessage, duration }; } } /** * Execute a function with timeout */ private async executeWithTimeout<T>( fn: () => Promise<T>, timeoutMs: number ): Promise<T> { return Promise.race([ fn(), new Promise<T>((_, reject) => setTimeout(() => reject(new Error(`Execution timeout after ${timeoutMs}ms`)), timeoutMs) ) ]); } /** * Clean up old executions based on retention policy */ async cleanupOldExecutions(maxAgeDays: number = 30, maxExecutionsPerTask: number = 100): Promise<void> { logger.info(`[TimingExecutor] Starting cleanup: maxAge=${maxAgeDays} days, maxPerTask=${maxExecutionsPerTask}`); const result = this.executionRecorder.cleanupOldExecutions(maxAgeDays, maxExecutionsPerTask); if (result.errors.length > 0) { logger.warn(`[TimingExecutor] Cleanup completed with ${result.errors.length} errors`); result.errors.forEach(err => logger.warn(` - ${err}`)); } else { logger.info(`[TimingExecutor] Cleanup completed successfully: deleted ${result.deletedCount} executions`); } } }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/portel-dev/ncp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server