Skip to main content
Glama

Orchestrator MCP

engine.ts•9.25 kB
/** * Core workflow execution engine * Extracted from ai/workflow.ts to match planned structure */ import { AIClient } from '../client.js'; import { IntelligentRouter, type RoutingDecision } from '../router.js'; import type { OrchestratorManager } from '../../orchestrator/manager.js'; import { WorkflowContextManager, type WorkflowContext } from './context.js'; import { WorkflowStepExecutor, type StepExecutionOptions } from './steps.js'; import { WorkflowFailureHandler, type RecoveryOptions } from './handlers.js'; import { synthesizeResults } from '../synthesis.js'; import { toolTracker } from '../toolTracker.js'; import { createLogger } from '../../utils/logging.js'; const logger = createLogger('workflow-engine'); /** * Workflow execution result */ export interface WorkflowExecutionResult { success: boolean; finalResult: string; context: WorkflowContext; metadata: { totalSteps: number; successfulSteps: number; failedSteps: number; executionTime: number; recoveryAttempts: number; }; } /** * Workflow execution options */ export interface WorkflowExecutionOptions extends StepExecutionOptions { parallel?: boolean; concurrency?: number; failureStrategy?: RecoveryOptions; maxRecoveryAttempts?: number; } /** * Core workflow automation engine */ export class WorkflowEngine { private contextManager: WorkflowContextManager; private stepExecutor: WorkflowStepExecutor; private failureHandler: WorkflowFailureHandler; constructor( private aiClient: AIClient, private router: IntelligentRouter, private orchestrator: OrchestratorManager ) { this.contextManager = new WorkflowContextManager(); this.stepExecutor = new WorkflowStepExecutor(orchestrator); this.failureHandler = new WorkflowFailureHandler(); } /** * Execute a complete AI-driven workflow */ async executeWorkflow( userRequest: string, options: WorkflowExecutionOptions = {} ): Promise<WorkflowExecutionResult> { const startTime = Date.now(); let recoveryAttempts = 0; const maxRecoveryAttempts = options.maxRecoveryAttempts || 2; logger.info('Starting workflow execution', { request: userRequest }); // Start tool tracking session const sessionId = toolTracker.startSession(userRequest); try { // Plan the workflow using AI const steps = await this.planWorkflow(userRequest); if (steps.length === 0) { throw new Error('No workflow steps could be planned for this request'); } // Create execution context const context = this.contextManager.createContext(userRequest, steps); this.contextManager.updateStatus(context, 'running'); // Execute workflow with recovery let executionResult = await this.executeWithRecovery(context, options); // Handle partial failures with recovery attempts while (!executionResult.success && recoveryAttempts < maxRecoveryAttempts) { recoveryAttempts++; logger.info('Attempting workflow recovery', { attempt: recoveryAttempts }); const recoveryResult = await this.attemptRecovery(context, options); if (recoveryResult.canRecover) { executionResult = await this.executeWithRecovery(context, options); } else { break; } } // Generate final result const finalResult = await this.generateFinalResult(context, executionResult.success); // Update final status this.contextManager.updateStatus( context, executionResult.success ? 'completed' : 'failed' ); const summary = this.contextManager.getSummary(context); logger.info('Workflow execution completed', { success: executionResult.success, totalSteps: summary.totalSteps, successfulSteps: summary.successfulSteps, executionTime: summary.totalTime, recoveryAttempts }); // End tool tracking session toolTracker.endSession(executionResult.success ? 'completed' : 'failed'); return { success: executionResult.success, finalResult, context, metadata: { totalSteps: summary.totalSteps, successfulSteps: summary.successfulSteps, failedSteps: summary.failedSteps, executionTime: summary.totalTime, recoveryAttempts, }, }; } catch (error) { const executionTime = Date.now() - startTime; logger.error('Workflow execution failed', error as Error, { executionTime }); // End tool tracking session with failure toolTracker.endSession('failed'); return { success: false, finalResult: `Workflow execution failed: ${error}`, context: this.contextManager.createContext(userRequest, []), metadata: { totalSteps: 0, successfulSteps: 0, failedSteps: 1, executionTime, recoveryAttempts, }, }; } } /** * Plan workflow steps using AI */ private async planWorkflow(userRequest: string): Promise<RoutingDecision[]> { logger.debug('Planning workflow steps', { request: userRequest }); try { // Use the intelligent router to plan the workflow const availableServers = this.orchestrator.getConnectedServers(); const routingDecisions = await this.router.routeRequest(userRequest, availableServers); logger.debug('Workflow planned', { steps: routingDecisions.length, tools: routingDecisions.map(r => r.selectedTool) }); return routingDecisions; } catch (error) { logger.error('Failed to plan workflow', error as Error); throw new Error(`Workflow planning failed: ${error}`); } } /** * Execute workflow with built-in recovery */ private async executeWithRecovery( context: WorkflowContext, options: WorkflowExecutionOptions ): Promise<{ success: boolean; results: any[] }> { try { let results; if (options.parallel) { results = await this.stepExecutor.executeStepsParallel( context.steps, context, options ); } else { results = await this.stepExecutor.executeSteps( context.steps, context, options ); } const successfulResults = results.filter(r => r.success); const success = successfulResults.length === results.length; return { success, results: successfulResults.map(r => r.result) }; } catch (error) { logger.error('Workflow execution error', error as Error); return { success: false, results: [] }; } } /** * Attempt to recover from workflow failure */ private async attemptRecovery( context: WorkflowContext, options: WorkflowExecutionOptions ): Promise<{ canRecover: boolean }> { logger.info('Attempting workflow recovery'); try { const lastError = context.results .filter(r => !r.success) .pop()?.error; const recoveryResult = await this.failureHandler.handleWorkflowFailure( context, new Error(lastError || 'Unknown workflow failure') ); if (recoveryResult.canRecover && recoveryResult.recoveryPlan) { // Apply recovery plan if (recoveryResult.recoveryPlan.restartFromStep !== undefined) { // Remove results from failed step onwards context.results = context.results.slice(0, recoveryResult.recoveryPlan.restartFromStep); } if (recoveryResult.recoveryPlan.modifiedSteps) { context.steps = recoveryResult.recoveryPlan.modifiedSteps; } if (recoveryResult.recoveryPlan.fallbackWorkflow) { context.steps = recoveryResult.recoveryPlan.fallbackWorkflow; context.results = []; // Start fresh with fallback workflow } return { canRecover: true }; } return { canRecover: false }; } catch (error) { logger.error('Recovery attempt failed', error as Error); return { canRecover: false }; } } /** * Generate final result from workflow execution */ private async generateFinalResult( context: WorkflowContext, success: boolean ): Promise<string> { try { if (success) { // Synthesize successful results const toolResults = context.results.map(result => ({ tool: result.tool, result: result.result, success: result.success, error: result.error, })); return await synthesizeResults(this.aiClient, context.originalRequest, toolResults); } else { // Generate failure summary const failureSummary = this.failureHandler.generateFailureSummary(context); return `Workflow partially completed with ${failureSummary.successfulSteps}/${failureSummary.totalSteps} successful steps.\n\n` + `Failures: ${failureSummary.failureReasons.join(', ')}\n\n` + `Partial results available from successful steps.`; } } catch (error) { logger.error('Failed to generate final result', error as Error); return `Workflow completed but result synthesis failed: ${error}`; } } }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Phoenixrr2113/Orchestrator-MCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server