Skip to main content
Glama
portel-dev

NCP - Natural Context Provider

by portel-dev
job-executor.ts8.42 kB
/** * Job Executor - Execute scheduled jobs * Lightweight execution context for cron-triggered jobs */ import { JobManager } from './job-manager.js'; import { ExecutionRecorder } from './execution-recorder.js'; import { CronManager } from './cron-manager.js'; import { logger } from '../../utils/logger.js'; import { v4 as uuidv4 } from 'uuid'; export interface ExecutionResult { executionId: string; status: 'success' | 'failure' | 'timeout'; result?: any; error?: string; duration: number; } export class JobExecutor { private jobManager: JobManager; private executionRecorder: ExecutionRecorder; private cronManager?: CronManager; private defaultTimeout: number = 5 * 60 * 1000; // 5 minutes constructor() { this.jobManager = new JobManager(); this.executionRecorder = new ExecutionRecorder(); // Only initialize cron manager on non-Windows platforms if (process.platform !== 'win32') { try { this.cronManager = new CronManager(); } catch (error) { logger.warn(`[JobExecutor] Cron manager initialization failed: ${error instanceof Error ? error.message : String(error)}`); } } else { logger.warn('[JobExecutor] Cron manager not available on Windows'); } } /** * Execute a scheduled job by ID */ async executeJob(jobId: string, timeout?: number): Promise<ExecutionResult> { const startTime = Date.now(); const executionId = uuidv4(); const executionTimeout = timeout || this.defaultTimeout; console.log('[DEBUG] executeJob called with jobId:', jobId); logger.info(`[JobExecutor] Starting execution ${executionId} for job ${jobId}`); // Load job const job = this.jobManager.getJob(jobId); if (!job) { const error = `Job ${jobId} not found`; logger.error(`[JobExecutor] ${error}`); return { executionId, status: 'failure', error, duration: Date.now() - startTime }; } // Check if job is active if (job.status !== 'active') { const error = `Job ${job.name} is not active (status: ${job.status})`; logger.warn(`[JobExecutor] ${error}`); return { executionId, status: 'failure', error, duration: Date.now() - startTime }; } // Start execution recording this.executionRecorder.startExecution({ executionId, jobId: job.id, jobName: job.name, tool: job.tool, parameters: job.parameters, startedAt: new Date().toISOString(), status: 'running' }); // Track original working directory for restoration const originalCwd = process.cwd(); try { logger.info(`[JobExecutor] Executing ${job.tool} with parameters: ${JSON.stringify(job.parameters)}`); // Change to home directory to use global ~/.ncp config // Scheduled jobs are system-level automation and should use global MCP configuration const { homedir } = await import('os'); const homeDirectory = homedir(); if (homeDirectory !== originalCwd) { process.chdir(homeDirectory); logger.debug(`[JobExecutor] Changed working directory from ${originalCwd} to ${homeDirectory} for global MCP access`); } // For MCP tools like "apple-mcp:mail", use orchestrator with specific profile // Extract MCP name from tool (e.g., "apple-mcp" from "apple-mcp:mail") const mcpName = job.tool.includes(':') ? job.tool.split(':')[0] : null; console.log(`[JobExecutor] DEBUG: Tool: ${job.tool}, MCP: ${mcpName || 'none'}`); logger.debug(`[JobExecutor] Tool: ${job.tool}, MCP: ${mcpName || 'none'}`); // Create orchestrator with specific MCP profile console.log('[JobExecutor] DEBUG: Importing NCPOrchestrator...'); const { NCPOrchestrator } = await import('../../orchestrator/ncp-orchestrator.js'); console.log('[JobExecutor] DEBUG: Creating orchestrator instance...'); const orchestrator = new NCPOrchestrator(mcpName || 'all', true); // Silent mode console.log('[JobExecutor] DEBUG: Orchestrator created'); // Initialize and wait for background init to complete (with timeout) console.log('[JobExecutor] DEBUG: Calling initialize()...'); await orchestrator.initialize(); console.log('[JobExecutor] DEBUG: Initialize complete'); // Wait for initialization with 30 second timeout console.log('[JobExecutor] DEBUG: Waiting for initialization (30s timeout)...'); await this.executeWithTimeout( () => orchestrator.waitForInitialization(), 30000 ); console.log('[JobExecutor] DEBUG: Initialization wait complete'); // Execute tool with timeout console.log(`[JobExecutor] DEBUG: Executing tool ${job.tool}...`); const execResult = await this.executeWithTimeout<any>( () => orchestrator.run(job.tool, job.parameters), executionTimeout ); console.log('[JobExecutor] DEBUG: Tool execution complete'); // Cleanup orchestrator await orchestrator.cleanup(); // Restore original working directory process.chdir(originalCwd); logger.debug(`[JobExecutor] Restored working directory to ${originalCwd}`); if (!execResult.success) { throw new Error(execResult.error || 'Tool execution failed'); } const result = execResult.content; // Record successful execution const duration = Date.now() - startTime; this.executionRecorder.completeExecution(executionId, 'success', result); // Update job metadata this.jobManager.recordExecution(jobId, executionId, new Date().toISOString()); // Check if job should be removed from cron (fireOnce, maxExecutions, endDate) const updatedJob = this.jobManager.getJob(jobId); if (updatedJob && updatedJob.status === 'completed' && this.cronManager) { logger.info(`[JobExecutor] Job ${job.name} completed, removing from cron`); this.cronManager.removeJob(jobId); } logger.info(`[JobExecutor] Execution ${executionId} completed successfully in ${duration}ms`); return { executionId, status: 'success', result, duration }; } catch (error) { const duration = Date.now() - startTime; const isTimeout = error instanceof Error && error.message.includes('timeout'); const status = isTimeout ? 'timeout' : 'failure'; const errorMessage = error instanceof Error ? error.message : String(error); logger.error(`[JobExecutor] Execution ${executionId} failed: ${errorMessage}`); // Restore original working directory try { process.chdir(originalCwd); logger.debug(`[JobExecutor] Restored working directory to ${originalCwd} after error`); } catch (chdirError) { logger.warn(`[JobExecutor] Failed to restore working directory: ${chdirError}`); } // Record failed execution this.executionRecorder.completeExecution( executionId, status, undefined, { message: errorMessage } ); // Mark job as errored if it's not a timeout if (!isTimeout) { this.jobManager.markJobAsErrored(jobId, errorMessage); } return { executionId, status, error: errorMessage, duration }; } } /** * Execute a function with timeout */ private async executeWithTimeout<T>( fn: () => Promise<T>, timeoutMs: number ): Promise<T> { return Promise.race([ fn(), new Promise<T>((_, reject) => setTimeout(() => reject(new Error(`Execution timeout after ${timeoutMs}ms`)), timeoutMs) ) ]); } /** * Clean up old executions based on retention policy */ async cleanupOldExecutions(maxAgeDays: number = 30, maxExecutionsPerJob: number = 100): Promise<void> { logger.info(`[JobExecutor] Starting cleanup: maxAge=${maxAgeDays} days, maxPerJob=${maxExecutionsPerJob}`); const result = this.executionRecorder.cleanupOldExecutions(maxAgeDays, maxExecutionsPerJob); if (result.errors.length > 0) { logger.warn(`[JobExecutor] Cleanup completed with ${result.errors.length} errors`); result.errors.forEach(err => logger.warn(` - ${err}`)); } else { logger.info(`[JobExecutor] Cleanup completed successfully: deleted ${result.deletedCount} executions`); } } }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/portel-dev/ncp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server