Skip to main content
Glama
workflow-runner.tsβ€’14.1 kB
import { ToolBuilder } from '@/packages/core/tools/tool-builder.js'; import { DataStore } from '@core/datastore/types.js'; import { WorkflowExecutor } from '@/packages/core/tools/tool-executor.js'; import { IntegrationManager } from '@core/integrations/integration-manager.js'; import { logEmitter, logMessage } from '@core/utils/logs.js'; import { Integration, Workflow, WorkflowResult } from '@superglue/client'; import { generateUniqueId } from '@superglue/shared/utils'; import { BaseWorkflowConfig } from './config-loader.js'; import { validateWorkflowResult, type SoftValidationResult } from './soft-validator.js'; export interface WorkflowRunAttempt { attemptNumber: number; buildTime: number; buildSuccess: boolean; buildError?: string; executionTime: number; executionSuccess: boolean; executionError?: string; workflowPlan?: Workflow; result?: WorkflowResult; } export interface WorkflowRunResult { workflowId: string; workflowName: string; totalAttempts: number; successfulAttempts: number; successRate: number; attempts: WorkflowRunAttempt[]; finalResult?: WorkflowResult; collectedLogs?: any[]; softValidation?: SoftValidationResult; // Result of soft validation if enabled } export interface WorkflowRunnerOptions { maxAttemptsPerWorkflow: number; collectLogs?: boolean; saveRuns?: boolean; delayBetweenAttempts?: number; // Set to 0 for testing, use 1000-2000ms for production APIs to avoid rate limiting onAttemptComplete?: (attempt: WorkflowRunAttempt) => void; enableSoftValidation?: boolean; // Enable LLM-based validation of results expectedResult?: string; // Expected result for soft validation (description or JSON) } /** * Count API call failures from collected logs */ export function countApiFailures(logs: any[] = []): number { return logs.filter(log => log.level === 'WARN' && log.message?.includes('API call failed') ).length; } export class WorkflowRunner { private metadata: { orgId: string; userId: string }; constructor( private datastore: DataStore, orgId: string = 'workflow-runner', userId: string = 'system' ) { this.metadata = { orgId, userId }; } /** * Run a workflow with multiple attempts */ async runWorkflow( workflowConfig: BaseWorkflowConfig, integrations: Integration[], options: WorkflowRunnerOptions ): Promise<WorkflowRunResult> { const attempts: WorkflowRunAttempt[] = []; const collectedLogs: any[] = []; let successfulAttempts = 0; let finalResult: WorkflowResult | undefined; // Set up log collection if requested const logListener = options.collectLogs ? (entry: any) => { if (entry.level !== 'INFO') { collectedLogs.push(entry); } } : undefined; if (logListener) { logEmitter.on('log', logListener); } try { // Run multiple attempts for (let attemptNum = 1; attemptNum <= options.maxAttemptsPerWorkflow; attemptNum++) { logMessage('info', `πŸ”„ Starting attempt ${attemptNum}/${options.maxAttemptsPerWorkflow} for workflow: ${workflowConfig.name}`, this.metadata ); const attempt = await this.runSingleAttempt( workflowConfig, integrations, attemptNum, options.saveRuns ?? false ); attempts.push(attempt); if (attempt.executionSuccess && attempt.result) { successfulAttempts++; finalResult = attempt.result; logMessage('info', `βœ… Workflow ${workflowConfig.name} succeeded on attempt ${attemptNum}`, this.metadata ); } else { logMessage('warn', `⚠️ Workflow ${workflowConfig.name} failed on attempt ${attemptNum}`, this.metadata ); } // Call hook if provided if (options.onAttemptComplete) { options.onAttemptComplete(attempt); } // Add delay between attempts if not the last one if (attemptNum < options.maxAttemptsPerWorkflow && options.delayBetweenAttempts) { logMessage('info', `⏳ Waiting ${options.delayBetweenAttempts}ms before next attempt...`, this.metadata ); await new Promise(resolve => setTimeout(resolve, options.delayBetweenAttempts)); } } } finally { // Clean up log listener if (logListener) { logEmitter.off('log', logListener); } } let successRate = successfulAttempts / options.maxAttemptsPerWorkflow; logMessage('info', `πŸ“Š Workflow ${workflowConfig.name} completed: ${successfulAttempts}/${options.maxAttemptsPerWorkflow} successful (${(successRate * 100).toFixed(1)}% success rate)`, this.metadata ); // Perform soft validation if enabled and we have a result let softValidation: SoftValidationResult | undefined; if (options.enableSoftValidation && options.expectedResult && finalResult?.data) { try { logMessage('info', `🎯 Running soft validation for ${workflowConfig.name}...`, this.metadata); softValidation = await validateWorkflowResult( finalResult.data, options.expectedResult, workflowConfig.instruction, this.metadata ); logMessage('info', `🎯 Soft validation result: ${softValidation.success ? 'βœ… PASS' : '❌ FAIL'}`, this.metadata ); // If soft validation is enabled and fails, adjust the success metrics if (!softValidation.success) { // Override the success rate if soft validation fails successfulAttempts = 0; successRate = 0; finalResult = undefined; logMessage('warn', `⚠️ Soft validation failed - marking workflow as failed despite execution success`, this.metadata ); } } catch (error) { logMessage('error', `❌ Soft validation error for ${workflowConfig.name}: ${error}`, this.metadata ); } } return { workflowId: workflowConfig.id, workflowName: workflowConfig.name, totalAttempts: options.maxAttemptsPerWorkflow, successfulAttempts, successRate, attempts, finalResult, collectedLogs: options.collectLogs ? collectedLogs : undefined, softValidation }; } /** * Run a single attempt of building and executing a workflow */ private async runSingleAttempt( workflowConfig: BaseWorkflowConfig, integrations: Integration[], attemptNumber: number, saveRun: boolean ): Promise<WorkflowRunAttempt> { const attempt: WorkflowRunAttempt = { attemptNumber, buildTime: 0, buildSuccess: false, executionTime: 0, executionSuccess: false }; // Build phase const buildStart = Date.now(); let workflow: Workflow | undefined; try { logMessage('info', `πŸ“ Building workflow ${workflowConfig.name}...`, this.metadata); const builder = new ToolBuilder( workflowConfig.instruction, integrations, workflowConfig.payload || {}, {}, this.metadata ); workflow = await builder.buildTool(); workflow.id = await generateUniqueId({ baseId: workflow.id, exists: async (id) => !!(await this.datastore.getWorkflow({ id, orgId: this.metadata.orgId })) }); attempt.buildSuccess = true; attempt.workflowPlan = workflow; attempt.buildTime = Date.now() - buildStart; logMessage('info', `πŸ”¨ Build successful for ${workflowConfig.name} in ${attempt.buildTime}ms`, this.metadata ); } catch (error) { attempt.buildTime = Date.now() - buildStart; attempt.buildError = error instanceof Error ? error.message : String(error); logMessage('error', `❌ Build failed for ${workflowConfig.name}: ${attempt.buildError}`, this.metadata ); return attempt; } // Execute phase if (workflow) { const execStart = Date.now(); try { logMessage('info', `πŸš€ Executing workflow ${workflowConfig.name}...`, this.metadata); const metadataWithWorkflowId = { ...this.metadata, workflowId: workflowConfig.id, runId: `${workflowConfig.id}-${attemptNumber}` }; const executor = new WorkflowExecutor( { workflow, metadata: metadataWithWorkflowId, integrations: IntegrationManager.fromIntegrations(integrations, this.datastore, this.metadata.orgId) } ); // Combine all credentials from integrations const allCredentials = integrations.reduce((acc, integ) => { if (integ.credentials && typeof integ.credentials === 'object') { for (const [key, value] of Object.entries(integ.credentials)) { acc[`${integ.id}_${key}`] = value; } } return acc; }, {} as Record<string, string>); const workflowResult = await executor.execute( { payload: workflowConfig.payload || {}, credentials: allCredentials, options: {} } ); attempt.executionTime = Date.now() - execStart; attempt.result = workflowResult; attempt.executionSuccess = workflowResult.success; // Save run if requested if (saveRun) { await this.datastore.createRun({ result: { id: workflowResult.id, success: workflowResult.success, error: workflowResult.error || undefined, config: workflowResult.config || workflow, stepResults: workflowResult.stepResults || [], startedAt: workflowResult.startedAt, completedAt: workflowResult.completedAt || new Date(), data: null }, orgId: this.metadata.orgId }); } if (attempt.executionSuccess) { logMessage('info', `βœ… Execution successful for ${workflowConfig.name} in ${attempt.executionTime}ms`, this.metadata ); } else { const errorMsg = workflowResult.error ? (typeof workflowResult.error === 'string' ? workflowResult.error : JSON.stringify(workflowResult.error)) : 'Unknown error'; attempt.executionError = errorMsg; logMessage('warn', `❌ Execution failed for ${workflowConfig.name}: ${errorMsg}`, this.metadata ); } } catch (error) { attempt.executionTime = Date.now() - execStart; attempt.executionError = error instanceof Error ? error.message : String(error); logMessage('error', `❌ Execution error for ${workflowConfig.name}: ${attempt.executionError}`, this.metadata ); } } return attempt; } /** * Run multiple workflows in sequence */ async runWorkflows( workflows: BaseWorkflowConfig[], integrations: Integration[], options: WorkflowRunnerOptions, onWorkflowComplete?: (result: WorkflowRunResult) => void ): Promise<WorkflowRunResult[]> { const results: WorkflowRunResult[] = []; for (const workflow of workflows) { // Get integrations for this workflow const workflowIntegrations = integrations.filter(i => workflow.integrationIds.includes(i.id) ); if (workflowIntegrations.length !== workflow.integrationIds.length) { logMessage('warn', `⚠️ Workflow ${workflow.name} requires integrations that are not available`, this.metadata ); continue; } const result = await this.runWorkflow(workflow, workflowIntegrations, options); results.push(result); if (onWorkflowComplete) { onWorkflowComplete(result); } } return results; } }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/superglue-ai/superglue'

If you have feedback or need assistance with the MCP directory API, please join our Discord server