Skip to main content
Glama

Orchestrator MCP

steps.ts•8.12 kB
/** * Step execution logic * Extracted from ai/workflow.ts to match planned structure */ import type { RoutingDecision } from '../router.js'; import type { OrchestratorManager } from '../../orchestrator/manager.js'; import type { WorkflowStepResult, WorkflowContext } from './context.js'; import { toolTracker } from '../toolTracker.js'; import { createLogger } from '../../utils/logging.js'; const logger = createLogger('workflow-steps'); /** * Step execution options */ export interface StepExecutionOptions { timeout?: number; retryAttempts?: number; retryDelay?: number; continueOnFailure?: boolean; } /** * Step executor for workflow automation */ export class WorkflowStepExecutor { constructor(private orchestrator: OrchestratorManager) {} /** * Execute a single workflow step */ async executeStep( step: RoutingDecision, stepIndex: number, context: WorkflowContext, options: StepExecutionOptions = {} ): Promise<WorkflowStepResult> { const startTime = Date.now(); logger.debug('Executing workflow step', { step: step.selectedTool, index: stepIndex, confidence: step.confidence }); // Track tool execution const executionId = toolTracker.startToolExecution(step.selectedTool, step.parameters); try { // Prepare parameters with context data const enrichedParameters = this.enrichParameters(step.parameters, context); // Execute the tool with timeout, passing the execution ID to avoid duplicate tracking const result = await this.executeWithTimeout( step.selectedTool, enrichedParameters, options.timeout || 30000, executionId ); const executionTime = Date.now() - startTime; // Mark execution as successful toolTracker.endToolExecution(executionId, true, result); logger.debug('Step executed successfully', { step: step.selectedTool, executionTime, resultSize: JSON.stringify(result).length }); return { stepIndex, tool: step.selectedTool, success: true, result, executionTime, metadata: { confidence: step.confidence, reasoning: step.reasoning, parameters: enrichedParameters, }, }; } catch (error) { const executionTime = Date.now() - startTime; const errorMessage = error instanceof Error ? error.message : String(error); // Mark execution as failed toolTracker.endToolExecution(executionId, false, undefined, errorMessage); logger.error('Step execution failed', error as Error, { step: step.selectedTool, executionTime }); // Retry logic if (options.retryAttempts && options.retryAttempts > 0) { logger.info('Retrying step execution', { step: step.selectedTool, attemptsLeft: options.retryAttempts }); if (options.retryDelay) { await this.delay(options.retryDelay); } return this.executeStep(step, stepIndex, context, { ...options, retryAttempts: options.retryAttempts - 1, }); } return { stepIndex, tool: step.selectedTool, success: false, error: errorMessage, executionTime, metadata: { confidence: step.confidence, reasoning: step.reasoning, parameters: step.parameters, }, }; } } /** * Execute multiple steps in sequence */ async executeSteps( steps: RoutingDecision[], context: WorkflowContext, options: StepExecutionOptions = {} ): Promise<WorkflowStepResult[]> { const results: WorkflowStepResult[] = []; for (let i = 0; i < steps.length; i++) { const step = steps[i]; logger.info('Executing workflow step', { step: i + 1, total: steps.length, tool: step.selectedTool }); const result = await this.executeStep(step, i, context, options); results.push(result); // Update context with result context.results.push(result); // Check if we should continue on failure if (!result.success && !options.continueOnFailure) { logger.warn('Stopping workflow due to step failure', { failedStep: step.selectedTool, stepIndex: i }); break; } // Add small delay between steps to avoid overwhelming servers if (i < steps.length - 1) { await this.delay(100); } } return results; } /** * Execute steps in parallel (with concurrency limit) */ async executeStepsParallel( steps: RoutingDecision[], context: WorkflowContext, options: StepExecutionOptions & { concurrency?: number } = {} ): Promise<WorkflowStepResult[]> { const concurrency = options.concurrency || 3; const results: WorkflowStepResult[] = []; // Execute steps in batches for (let i = 0; i < steps.length; i += concurrency) { const batch = steps.slice(i, i + concurrency); logger.info('Executing workflow batch', { batch: Math.floor(i / concurrency) + 1, batchSize: batch.length, totalSteps: steps.length }); const batchPromises = batch.map((step, batchIndex) => this.executeStep(step, i + batchIndex, context, options) ); const batchResults = await Promise.allSettled(batchPromises); batchResults.forEach((result, batchIndex) => { if (result.status === 'fulfilled') { results.push(result.value); context.results.push(result.value); } else { const failedResult: WorkflowStepResult = { stepIndex: i + batchIndex, tool: batch[batchIndex].selectedTool, success: false, error: result.reason?.message || 'Unknown error', executionTime: 0, }; results.push(failedResult); context.results.push(failedResult); } }); } return results; } /** * Enrich parameters with context data */ private enrichParameters( parameters: Record<string, any>, context: WorkflowContext ): Record<string, any> { const enriched = { ...parameters }; // Replace template variables with context data const contextData = this.extractContextData(context); for (const [key, value] of Object.entries(enriched)) { if (typeof value === 'string' && value.includes('{{')) { enriched[key] = this.replaceTemplateVariables(value, contextData); } } return enriched; } /** * Extract relevant data from context for parameter enrichment */ private extractContextData(context: WorkflowContext): Record<string, any> { const data: Record<string, any> = { originalRequest: context.originalRequest, ...context.variables, }; // Add results from previous steps context.results.forEach((result, index) => { if (result.success && result.result) { data[`step_${index}_result`] = result.result; data[`${result.tool}_result`] = result.result; } }); return data; } /** * Replace template variables in strings */ private replaceTemplateVariables(template: string, data: Record<string, any>): string { return template.replace(/\{\{(\w+)\}\}/g, (match, key) => { return data[key] !== undefined ? String(data[key]) : match; }); } /** * Execute tool with timeout */ private async executeWithTimeout( tool: string, parameters: Record<string, any>, timeoutMs: number, executionId?: string ): Promise<any> { return Promise.race([ this.orchestrator.callTool(tool, parameters, executionId), new Promise((_, reject) => setTimeout(() => reject(new Error(`Tool execution timeout: ${tool}`)), timeoutMs) ), ]); } /** * Delay execution */ private delay(ms: number): Promise<void> { return new Promise(resolve => setTimeout(resolve, ms)); } }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Phoenixrr2113/Orchestrator-MCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server