Skip to main content
Glama

Vibe Coder MCP

by freshtechbro
workflowExecutor.ts23.3 kB
// src/services/workflows/workflowExecutor.ts import fs from 'fs-extra'; import path from 'path'; import { fileURLToPath } from 'url'; // Need this for relative pathing in ES Modules import { CallToolResult } from '@modelcontextprotocol/sdk/types.js'; import { executeTool, ToolExecutionContext } from '../routing/toolRegistry.js'; // Import ToolExecutionContext import { OpenRouterConfig } from '../../types/workflow.js'; import logger from '../../logger.js'; import { AppError, ToolExecutionError, ConfigurationError, ParsingError } from '../../utils/errors.js'; import { jobManager, JobStatus } from '../job-manager/index.js'; // Import Job Manager import { sseNotifier } from '../sse-notifier/index.js'; // Import SSE Notifier // --- Constants for Adaptive Job Polling --- const INITIAL_POLLING_INTERVAL_MS = 1000; // Start with 1 second const MAX_POLLING_INTERVAL_MS = 10000; // Maximum 10 seconds between polls const POLLING_BACKOFF_FACTOR = 1.5; // Increase interval by 50% each time const MAX_POLLING_DURATION_MS = 300000; // Maximum 5 minutes total polling time // --- Interfaces --- /** Defines a single step within a workflow template. */ interface WorkflowStep { /** Unique identifier for this step within the workflow. */ id: string; /** The name of the tool to execute for this step. */ toolName: string; /** Parameters for the tool, where values can be static or template strings. */ params: Record<string, string>; // Values are templates like "{workflow.input.xyz}" or "{steps.id.output...}" } /** Defines the structure of a workflow template. */ interface WorkflowDefinition { /** A description of what the workflow achieves. */ description: string; /** Optional schema defining expected inputs for the entire workflow. */ inputSchema?: Record<string, string>; // Simple type check for now /** Ordered array of steps to execute. */ steps: WorkflowStep[]; /** Optional template defining the structure of the final workflow output. */ output?: Record<string, string>; } /** Defines the expected structure of the workflows JSON file. */ interface WorkflowFileFormat { /** A map where keys are workflow names and values are WorkflowDefinition objects. */ workflows: Record<string, WorkflowDefinition>; } /** Defines the result structure returned by executeWorkflow. */ export interface WorkflowResult { /** Indicates if the workflow completed all steps successfully. */ success: boolean; /** A summary message indicating the outcome. */ message: string; /** Optional: The processed final output based on the workflow's output template. */ outputs?: Record<string, unknown>; /** Optional: Raw results of each executed step, keyed by step ID. */ stepResults?: Map<string, CallToolResult>; /** Optional: Details about the error if the workflow failed. */ error?: { /** The ID of the step where the error occurred. */ stepId?: string; /** The name of the tool that failed. */ toolName?: string; /** The error message. */ message: string; /** Additional error details, if available. */ details?: Record<string, unknown>; }; } // --- Store for loaded definitions --- let loadedWorkflows = new Map<string, WorkflowDefinition>(); // --- Calculate default path relative to this file --- const __filename = fileURLToPath(import.meta.url); const __dirname = path.dirname(__filename); // Go up three levels (src/services/workflows -> src/services -> src -> project root) const defaultWorkflowPath = path.resolve(__dirname, '../../../workflows.json'); /** * Loads and performs basic validation on workflow definitions from a JSON file. * Clears existing loaded workflows before attempting to load. * Logs warnings or errors but does not throw if loading fails, allowing the server to start. * @param filePath Path to the workflows JSON file. Defaults to 'workflows.json' in the project root. */ export function loadWorkflowDefinitions(filePath: string = defaultWorkflowPath): void { logger.info(`Attempting to load workflow definitions from: ${filePath}`); loadedWorkflows = new Map(); // Clear previous definitions first try { if (!fs.existsSync(filePath)) { logger.warn(`Workflow definition file not found: ${filePath}. No workflows loaded.`); return; } const fileContent = fs.readFileSync(filePath, 'utf-8'); const workflowData: WorkflowFileFormat = JSON.parse(fileContent); // Basic structural validation if (!workflowData || typeof workflowData.workflows !== 'object' || workflowData.workflows === null) { throw new ConfigurationError('Invalid workflow file format: Root "workflows" object missing or invalid.'); } // TODO: Add more robust validation using Zod for the WorkflowFileFormat structure // This would involve defining Zod schemas for WorkflowDefinition, WorkflowStep etc. // and parsing workflowData against that schema. loadedWorkflows = new Map(Object.entries(workflowData.workflows)); logger.info(`Successfully loaded ${loadedWorkflows.size} workflow definitions.`); } catch (error) { logger.error({ err: error, filePath }, 'Failed to load or parse workflow definitions. Workflows will be unavailable.'); // Clear workflows again in case of partial loading before error loadedWorkflows = new Map(); // Consider re-throwing if workflow loading is critical: // if (error instanceof Error) { // throw new ConfigurationError(`Failed to load workflows from ${filePath}: ${error.message}`, undefined, error); // } else { // throw new ConfigurationError(`Failed to load workflows from ${filePath}: Unknown error.`); // } } } /** * Resolves a parameter value template string against workflow inputs and step outputs. * Handles simple path traversal for step outputs (e.g., `content[0].text`). * * @param valueTemplate Template string (e.g., "{workflow.input.xyz}", "{steps.id.output.content[0].text}") or a literal value. * @param workflowInput The initial input object provided to the workflow. * @param stepOutputs A Map containing results (CallToolResult) from previously executed steps, keyed by step ID. * @returns The resolved value. * @throws {ParsingError} if the template syntax is invalid or the referenced data cannot be found. */ function resolveParamValue( valueTemplate: unknown, workflowInput: Record<string, unknown>, stepOutputs: Map<string, CallToolResult> ): unknown { if (typeof valueTemplate !== 'string') { return valueTemplate; // Return non-strings (like numbers, booleans, objects from template) directly } const match = valueTemplate.match(/^{(workflow\.input\.([\w-]+))|(steps\.([\w-]+)\.output\.(.+))}$/); if (!match) { return valueTemplate; // Not a template, return as literal string } try { if (match[1]) { // workflow.input match (e.g., {workflow.input.productDescription}) const inputKey = match[2]; if (workflowInput && typeof workflowInput === 'object' && inputKey in workflowInput) { logger.debug(`Resolved template '${valueTemplate}' from workflow input key '${inputKey}'.`); // Use type assertion to tell TypeScript that this access is valid // We've already checked that inputKey exists in workflowInput return workflowInput[inputKey as keyof typeof workflowInput]; } throw new ParsingError(`Workflow input key "${inputKey}" not found for template '${valueTemplate}'.`); } else if (match[3]) { // steps match (e.g., {steps.step1_id.output.content[0].text}) const stepId = match[4]; const outputPath = match[5]; // e.g., 'content[0].text' or 'errorDetails.message' const stepResult = stepOutputs.get(stepId); if (!stepResult) { throw new ParsingError(`Output from step "${stepId}" not found (required for template '${valueTemplate}'). Ensure step IDs match and the step executed.`); } // Basic path traversal - handle potential errors gracefully let currentValue: unknown = stepResult; // Split by '.' or array access like '[0]' const pathParts = outputPath.match(/([^[.\]]+)|\[(\d+)\]/g); if (!pathParts) { throw new ParsingError(`Invalid output path format '${outputPath}' in template '${valueTemplate}'.`); } for (const part of pathParts) { if (currentValue === null || currentValue === undefined) { throw new ParsingError(`Cannot access path part '${part}' in '${outputPath}' from step '${stepId}' output because parent value is null or undefined. Template: '${valueTemplate}'.`); } const arrayMatch = part.match(/^\[(\d+)\]$/); if (arrayMatch) { // Array index like '[0]' const index = parseInt(arrayMatch[1], 10); if (!Array.isArray(currentValue) || index >= currentValue.length) { throw new ParsingError(`Index ${index} out of bounds for array in path '${outputPath}' from step '${stepId}'. Template: '${valueTemplate}'.`); } currentValue = currentValue[index]; } else { // Object key if (typeof currentValue !== 'object' || currentValue === null || !(part in currentValue)) { throw new ParsingError(`Key '${part}' not found in object path '${outputPath}' from step '${stepId}'. Template: '${valueTemplate}'.`); } // Use a type assertion to access the property after we've verified it exists currentValue = (currentValue as Record<string, unknown>)[part]; } } if (currentValue === undefined) { // It's possible for a valid path to resolve to undefined. Log and return it. logger.warn(`Resolved path '${outputPath}' resulted in undefined for step '${stepId}'. Template: '${valueTemplate}'`); } logger.debug(`Resolved template '${valueTemplate}' from step '${stepId}' output path '${outputPath}'.`); return currentValue; } } catch (error) { if (error instanceof ParsingError) throw error; // Re-throw known parsing errors logger.error({ err: error, template: valueTemplate }, `Error resolving parameter template`); // Wrap unexpected errors throw new ParsingError(`Unexpected error resolving template "${valueTemplate}": ${error instanceof Error ? error.message : String(error)}`); } // Should not be reached if regex matches correctly logger.warn(`Template '${valueTemplate}' matched regex but failed to resolve logic.`); return valueTemplate; // Fallback to literal } /** * Checks if a CallToolResult indicates a background job was started. * @param result The CallToolResult from executeTool. * @returns The Job ID if found, otherwise null. */ function getJobIdFromResult(result: CallToolResult): string | null { if (result.isError || !result.content || result.content.length === 0) { return null; } const textContent = result.content[0]?.text; if (typeof textContent === 'string') { const match = textContent.match(/Job ID: (\S+)/); if (match && match[1]) { return match[1]; } } return null; } /** * Waits for a background job to complete by polling the JobManager. * Uses an adaptive polling strategy with exponential backoff. * Sends progress updates via SSE. * @param jobId The ID of the job to wait for. * @param stepId The ID of the workflow step associated with this job. * @param sessionId The session ID for SSE notifications. * @returns The final CallToolResult from the completed or failed job. * @throws {ToolExecutionError} if the job is not found, polling times out, or the job fails unexpectedly. */ async function waitForJobCompletion(jobId: string, stepId: string, sessionId: string): Promise<CallToolResult> { logger.info({ jobId, stepId, sessionId }, `Waiting for background job to complete...`); let currentInterval = INITIAL_POLLING_INTERVAL_MS; let totalWaitTime = 0; let attempts = 0; const startTime = Date.now(); while (totalWaitTime < MAX_POLLING_DURATION_MS) { // Wait for the current interval await new Promise(resolve => setTimeout(resolve, currentInterval)); // Update tracking variables totalWaitTime = Date.now() - startTime; attempts++; // Get the job const job = jobManager.getJob(jobId); if (!job) { logger.error({ jobId, stepId, sessionId }, `Job not found during polling.`); throw new ToolExecutionError(`Background job ${jobId} for step ${stepId} was not found.`); } // Check if the job is completed or failed if (job.status === JobStatus.COMPLETED || job.status === JobStatus.FAILED) { logger.info({ jobId, stepId, sessionId, status: job.status }, `Job ${job.status.toLowerCase()}.`); if (!job.result) { logger.error({ jobId, stepId, sessionId, status: job.status }, `Job finished but has no result stored.`); throw new ToolExecutionError(`Background job ${jobId} for step ${stepId} finished with status ${job.status} but has no result.`); } // Send final status update via SSE sseNotifier.sendProgress(sessionId, jobId, job.status, job.status === JobStatus.COMPLETED ? `Job completed successfully.` : `Job failed: ${job.progressMessage || 'No error message available.'}` ); return job.result; } // Still running or pending logger.debug({ jobId, stepId, sessionId, status: job.status, attempt: attempts, currentInterval, totalWaitTime, maxWaitTime: MAX_POLLING_DURATION_MS }, `Polling job status: ${job.status}`); // Send intermediate progress update via SSE const progressMessage = `Job status: ${job.status}... (Polling for ${Math.floor(totalWaitTime / 1000)}s, next check in ${currentInterval / 1000}s)`; sseNotifier.sendProgress(sessionId, jobId, job.status, progressMessage); // Increase the polling interval for next iteration (with a maximum) currentInterval = Math.min(currentInterval * POLLING_BACKOFF_FACTOR, MAX_POLLING_INTERVAL_MS); } // If loop finishes, it means timeout logger.error({ jobId, stepId, sessionId, attempts, totalWaitTime }, `Polling timed out for job.`); throw new ToolExecutionError(`Timed out waiting for background job ${jobId} for step ${stepId} to complete after ${Math.floor(totalWaitTime / 1000)} seconds.`); } /** * Executes a predefined workflow by its name. * Iterates through the steps, resolves parameters, executes tools, handles potential background jobs, and manages errors. * * @param workflowName The name of the workflow (must be loaded). * @param workflowInput Input data for the workflow, matching its inputSchema. * @param config OpenRouter configuration passed to tools. * @param context Optional ToolExecutionContext containing sessionId for SSE. * @returns A promise resolving to the WorkflowResult. */ export async function executeWorkflow( workflowName: string, workflowInput: Record<string, unknown>, config: OpenRouterConfig, context?: ToolExecutionContext // Accept context ): Promise<WorkflowResult> { const workflow = loadedWorkflows.get(workflowName); const sessionId = context?.sessionId || `no-session-${Math.random().toString(36).substring(2)}`; // Get sessionId or generate placeholder if (!workflow) { logger.error(`Workflow "${workflowName}" not found.`); return { success: false, message: `Workflow "${workflowName}" not found.`, error: { message: `Workflow "${workflowName}" not found.`} }; } logger.info({ workflowName, sessionId }, `Starting workflow execution.`); // Use Map for step outputs as it preserves insertion order if needed, and allows any string key const stepOutputs = new Map<string, CallToolResult>(); let currentStepIndex = 0; let currentStep: WorkflowStep | undefined; try { // TODO: Optional: Validate workflowInput against workflow.inputSchema here if defined for (const step of workflow.steps) { currentStep = step; // Keep track of the current step for error reporting currentStepIndex++; const stepLogContext = { workflowName, sessionId, stepId: step.id, toolName: step.toolName, stepNum: currentStepIndex }; logger.info(stepLogContext, `Executing workflow step ${currentStepIndex}/${workflow.steps.length}`); // Use sendProgress for step start notification - use step.id as identifier since jobId isn't known yet sseNotifier.sendProgress(sessionId, step.id, JobStatus.RUNNING, `Workflow '${workflowName}': Starting step ${currentStepIndex} ('${step.id}' - ${step.toolName}).`); // Resolve parameters for this step const resolvedParams: Record<string, unknown> = {}; for (const [key, template] of Object.entries(step.params)) { try { resolvedParams[key] = resolveParamValue(template, workflowInput, stepOutputs); logger.debug(`Resolved param '${key}' for step '${step.id}'`); } catch (resolveError) { // If a parameter cannot be resolved, fail the workflow immediately logger.error({ err: resolveError, ...stepLogContext, paramKey: key, template }, `Failed to resolve parameter`); throw new AppError(`Failed to resolve parameter '${key}' for step '${step.id}': ${(resolveError as Error).message}`, { stepId: step.id, paramKey: key }, resolveError instanceof Error ? resolveError : undefined); } } // Execute the tool for this step, passing the context let stepResult = await executeTool(step.toolName, resolvedParams, config, context); // --- Handle potential background job --- const jobId = getJobIdFromResult(stepResult); if (jobId) { logger.info({ ...stepLogContext, jobId }, `Tool returned a background job ID. Waiting for completion...`); // Use sendProgress for step update notification sseNotifier.sendProgress(sessionId, jobId, JobStatus.RUNNING, `Workflow '${workflowName}' step '${step.id}': Waiting for background job ${jobId}...`); try { // Wait for the job and get its final result stepResult = await waitForJobCompletion(jobId, step.id, sessionId); logger.info({ ...stepLogContext, jobId, finalStatus: stepResult.isError ? 'FAILED' : 'COMPLETED' }, `Background job finished.`); } catch (jobError) { logger.error({ err: jobError, ...stepLogContext, jobId }, `Error waiting for background job.`); // Propagate the job waiting error, adding workflow context throw new ToolExecutionError(`Step '${step.id}' failed while waiting for background job ${jobId}: ${jobError instanceof Error ? jobError.message : String(jobError)}`, { stepId: step.id, toolName: step.toolName, jobId }); } } // --- End Job Handling --- // Store the final result (either immediate or from the job), keyed by step ID stepOutputs.set(step.id, stepResult); // Check for errors from the final tool execution result if (stepResult.isError) { const stepErrorMessage = stepResult.content[0]?.text || 'Unknown tool error'; logger.error({ ...stepLogContext, errorResult: stepResult }, `Workflow step failed.`); // Use sendProgress for step failure notification sseNotifier.sendProgress(sessionId, jobId || step.id, JobStatus.FAILED, `Workflow '${workflowName}' step '${step.id}' failed: ${stepErrorMessage}`); // Propagate the error, adding workflow context throw new ToolExecutionError(`Step '${step.id}' (Tool: ${step.toolName}) failed: ${stepErrorMessage}`, { stepId: step.id, toolName: step.toolName, toolResult: stepResult }); } logger.debug(stepLogContext, `Workflow step completed successfully.`); // Use sendProgress for step success notification sseNotifier.sendProgress(sessionId, jobId || step.id, JobStatus.COMPLETED, `Workflow '${workflowName}' step '${step.id}' completed successfully.`); } // End loop through steps // Process final output if defined let finalOutputData: Record<string, unknown> | undefined; let finalMessage = `Workflow "${workflowName}" completed successfully.`; if (workflow.output) { finalOutputData = {}; logger.debug(`Processing final workflow output template for ${workflowName}`); for (const [key, template] of Object.entries(workflow.output)) { try { finalOutputData[key] = resolveParamValue(template, workflowInput, stepOutputs); if (key === 'summary' && typeof finalOutputData[key] === 'string') { finalMessage = finalOutputData[key] as string; // Use template summary if available } } catch (resolveError) { logger.warn({ err: resolveError, key, template }, `Could not resolve output template key '${key}' for workflow ${workflowName}. Skipping.`); // Include error in the output for visibility finalOutputData[key] = `Error: Failed to resolve output template - ${(resolveError as Error).message}`; } } } logger.info({ workflowName, sessionId }, `Workflow execution finished successfully.`); return { success: true, message: finalMessage, outputs: finalOutputData, stepResults: stepOutputs, // Include raw results for debugging/auditing }; } catch (error) { // Catch errors from parameter resolution or tool execution/job waiting logger.error({ err: error, workflowName, sessionId, failedStepId: currentStep?.id, failedToolName: currentStep?.toolName }, `Workflow execution failed.`); const errDetails = { stepId: currentStep?.id, toolName: currentStep?.toolName, message: error instanceof Error ? error.message : 'Unknown workflow execution error', details: error instanceof AppError ? error.context : undefined, }; // Ensure SSE notification for the failed step if it wasn't sent already (using sendProgress) if (currentStep) { sseNotifier.sendProgress(sessionId, currentStep.id, JobStatus.FAILED, `Workflow '${workflowName}' failed at step '${currentStep.id}': ${errDetails.message}`); } return { success: false, message: `Workflow "${workflowName}" failed at step ${currentStepIndex} (${currentStep?.toolName || 'N/A'}): ${errDetails.message}`, stepResults: stepOutputs, // Include results up to the point of failure error: errDetails, }; } } // --- Load definitions on server startup --- // Ensures workflows are ready when the server starts. // Consider making this async if validation becomes complex or involves I/O. loadWorkflowDefinitions();

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/freshtechbro/vibe-coder-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server