import pino from 'pino';
import { McpPromptsClient } from '../adapters/mcp-prompts-client.js';
import { PromptLayer, Domain } from '@sparesparrow/mcp-fbs';
const logger = pino({
level: process.env.LOG_LEVEL || 'info',
transport: process.env.NODE_ENV === 'development' ? {
target: 'pino-pretty',
options: {
colorize: true
}
} : undefined
});
export interface WorkflowStep {
name: string;
tool: string;
args: Record<string, any>;
depends_on?: string[];
on_failure?: 'continue' | 'stop' | 'retry';
retry_count?: number;
}
export interface Workflow {
name: string;
description: string;
steps: WorkflowStep[];
context_requirements?: string[];
}
export class WorkflowOrchestrationService {
private mcpClient: McpPromptsClient;
constructor(mcpClient?: McpPromptsClient) {
this.mcpClient = mcpClient || new McpPromptsClient();
}
/**
* Execute a predefined workflow
*/
async executeWorkflow(workflowName: string, context: Record<string, any> = {}): Promise<any> {
logger.info(`Executing workflow: ${workflowName}`, { context });
// Query mcp-prompts for workflow configuration
const workflowPrompt = await this.mcpClient.getPrompt(`${workflowName}-workflow`);
if (!workflowPrompt) {
throw new Error(`Workflow not found: ${workflowName}`);
}
// Parse workflow definition from prompt content
const workflow = this.parseWorkflowFromPrompt(workflowPrompt.content);
// Execute workflow steps
const results = [];
for (const step of workflow.steps) {
try {
logger.info(`Executing step: ${step.name}`);
// Check dependencies
if (step.depends_on) {
const dependenciesMet = step.depends_on.every(dep =>
results.some(r => r.step === dep && r.success)
);
if (!dependenciesMet) {
throw new Error(`Dependencies not met for step ${step.name}: ${step.depends_on.join(', ')}`);
}
}
// Execute step
const result = await this.executeStep(step, context);
results.push({
step: step.name,
success: true,
result
});
} catch (error) {
logger.error(`Step ${step.name} failed:`, error);
const result = {
step: step.name,
success: false,
error: error.message
};
results.push(result);
// Handle failure based on step configuration
if (step.on_failure === 'stop') {
break;
} else if (step.on_failure === 'retry' && step.retry_count && step.retry_count > 0) {
// Implement retry logic
logger.info(`Retrying step ${step.name}, ${step.retry_count} attempts remaining`);
// TODO: Implement retry logic
}
// 'continue' is default behavior
}
}
// Capture workflow execution as new episodic knowledge
await this.captureWorkflowExecution(workflowName, results, context);
return {
workflow: workflowName,
total_steps: workflow.steps.length,
completed_steps: results.filter(r => r.success).length,
failed_steps: results.filter(r => !r.success).length,
results
};
}
/**
* Execute a single workflow step
*/
private async executeStep(step: WorkflowStep, context: Record<string, any>): Promise<any> {
// Query for tool configuration prompts
const configPrompt = await this.mcpClient.getPrompt(`${step.tool}-configuration`);
const config = configPrompt ? this.parseConfigFromPrompt(configPrompt.content) : {};
// Merge context with step args and config
const mergedArgs = {
...config,
...context,
...step.args
};
// Execute the tool (delegate to appropriate domain service)
// This is a simplified version - in reality, we'd have domain-specific services
switch (step.tool) {
case 'analyze_code':
return await this.executeAnalyzeCode(mergedArgs);
case 'run_tests':
return await this.executeRunTests(mergedArgs);
case 'git_analyze_history':
return await this.executeGitAnalyzeHistory(mergedArgs);
default:
throw new Error(`Unknown tool: ${step.tool}`);
}
}
/**
* Parse workflow definition from prompt content
*/
private parseWorkflowFromPrompt(content: string): Workflow {
// TODO: Implement proper parsing of workflow definitions from prompts
// For now, return a mock workflow
return {
name: 'mock-workflow',
description: 'Mock workflow for development',
steps: [
{
name: 'analyze',
tool: 'analyze_code',
args: { files: ['src/main.cpp'] }
},
{
name: 'test',
tool: 'run_tests',
args: {},
depends_on: ['analyze']
}
]
};
}
/**
* Parse configuration from prompt content
*/
private parseConfigFromPrompt(content: string): Record<string, any> {
// TODO: Implement proper parsing of configuration from prompts
// For now, return mock config
return {
enable_all_checks: true,
language: 'cpp'
};
}
/**
* Capture workflow execution as episodic knowledge
*/
private async captureWorkflowExecution(
workflowName: string,
results: any[],
context: Record<string, any>
): Promise<void> {
const episode = {
name: `workflow-execution-${workflowName}-${Date.now()}`,
problem_signature: {
symptoms: context.symptoms || ['workflow execution'],
context_fingerprint: JSON.stringify(context),
affected_components: [workflowName]
},
context: {
project_type: context.project_type || 'unknown',
domain: 'software-development'
},
investigation_path: results.map(r => ({
step_number: results.indexOf(r) + 1,
action: `Execute ${r.step}`,
findings: r.success ? 'Success' : `Failed: ${r.error}`,
led_to_solution: r.success
})),
solution: {
description: `Executed ${workflowName} workflow`,
implementation_steps: results.map(r => r.step),
success: results.every(r => r.success)
},
success: results.every(r => r.success),
cognitive_load: results.length,
tags: ['workflow', 'automation', workflowName]
};
await this.mcpClient.captureEpisode({
name: episode.name,
problem_signature: episode.problem_signature,
context: episode.context,
investigation_path: episode.investigation_path,
solution: episode.solution,
success: episode.success,
cognitive_load: episode.cognitive_load,
tags: episode.tags
});
}
// Tool execution methods (simplified implementations)
private async executeAnalyzeCode(args: any): Promise<any> {
// TODO: Implement actual code analysis
logger.info('Executing code analysis', args);
return { tool: 'analyze_code', status: 'completed', args };
}
private async executeRunTests(args: any): Promise<any> {
// TODO: Implement actual test execution
logger.info('Executing tests', args);
return { tool: 'run_tests', status: 'completed', args };
}
private async executeGitAnalyzeHistory(args: any): Promise<any> {
// TODO: Implement actual git analysis
logger.info('Executing git history analysis', args);
return { tool: 'git_analyze_history', status: 'completed', args };
}
/**
* Define a new workflow from successful execution patterns
*/
async createWorkflowFromPattern(patternName: string, executions: any[]): Promise<void> {
// Analyze successful executions to extract common patterns
const commonSteps = this.extractCommonSteps(executions);
const workflow: Workflow = {
name: patternName,
description: `Workflow derived from successful ${patternName} executions`,
steps: commonSteps
};
// Store as new procedural knowledge
await this.mcpClient.createPrompt({
name: `${patternName}-workflow`,
content: JSON.stringify(workflow, null, 2),
layer: PromptLayer.Procedural,
domain: Domain.SoftwareDevelopment,
tags: ['workflow', 'derived', patternName]
});
}
/**
* Extract common steps from successful executions
*/
private extractCommonSteps(executions: any[]): WorkflowStep[] {
// TODO: Implement pattern extraction logic
// For now, return mock steps
return [
{
name: 'analyze',
tool: 'analyze_code',
args: {}
},
{
name: 'test',
tool: 'run_tests',
args: {},
depends_on: ['analyze']
}
];
}
}