Skip to main content
Glama
WorkflowOrchestrator.ts21.6 kB
import { EventEmitter } from 'events'; import { GitHubWorkflowManager } from './GitHubWorkflowManager.js'; import { LintingManager } from './LintingManager.js'; import { FeedbackProcessor } from './FeedbackProcessor.js'; import { TitanMemoryModel } from '../model.js'; import type { WorkflowConfig, WorkflowStatus, WorkflowEvent, WorkflowMetrics, TitanMemorySystem } from '../types.js'; export class WorkflowOrchestrator extends EventEmitter { private config: WorkflowConfig; private memory: TitanMemorySystem; private gitHubManager!: GitHubWorkflowManager; private lintingManager!: LintingManager; private feedbackProcessor!: FeedbackProcessor; private status: WorkflowStatus; private metrics: WorkflowMetrics; private healthCheckInterval?: NodeJS.Timeout; private isInitialized = false; constructor(config: WorkflowConfig) { super(); this.config = config; this.memory = new TitanMemoryModel(config.memory.titanConfig); this.status = { state: 'initializing', lastUpdate: new Date(), activeWorkflows: [], health: 'unknown' }; this.metrics = { totalWorkflows: 0, successfulWorkflows: 0, failedWorkflows: 0, averageExecutionTime: 0, memoryUsage: 0, lastMetricsUpdate: new Date() }; } /** * Initialize the workflow orchestrator and all components */ async initialize(): Promise<void> { try { this.emit('status', { type: 'initialization', message: 'Starting workflow orchestrator' }); // Initialize memory system await this.memory.initialize(); this.emit('status', { type: 'initialization', message: 'Memory system initialized' }); // Initialize workflow managers this.gitHubManager = new GitHubWorkflowManager(this.config, this.memory); this.lintingManager = new LintingManager(this.config, this.memory); this.feedbackProcessor = new FeedbackProcessor(this.config, this.memory); // Set up event listeners this.setupEventListeners(); // Start health monitoring this.startHealthMonitoring(); this.status.state = 'ready'; this.status.health = 'healthy'; this.isInitialized = true; this.emit('status', { type: 'ready', message: 'Workflow orchestrator ready' }); } catch (error) { this.status.state = 'error'; this.status.health = 'unhealthy'; this.emit('error', { type: 'initialization', error }); throw error; } } /** * Execute a workflow by name with parameters */ async executeWorkflow(workflowName: string, params: Record<string, any> = {}): Promise<any> { if (!this.isInitialized) { throw new Error('Workflow orchestrator not initialized'); } const workflowId = this.generateWorkflowId(); const startTime = Date.now(); try { this.status.activeWorkflows.push({ id: workflowId, name: workflowName, startTime: new Date(), status: 'running' }); this.emit('workflow:start', { id: workflowId, name: workflowName, params }); let result: any; switch (workflowName) { case 'auto-release': result = await this.executeAutoRelease(params); break; case 'process-issue': result = await this.executeProcessIssue(params); break; case 'collect-feedback': result = await this.executeCollectFeedback(params); break; case 'quality-check': result = await this.executeQualityCheck(params); break; case 'full-pipeline': result = await this.executeFullPipeline(params); break; default: throw new Error(`Unknown workflow: ${workflowName}`); } const executionTime = Date.now() - startTime; this.updateMetrics(true, executionTime); this.removeActiveWorkflow(workflowId); this.emit('workflow:complete', { id: workflowId, name: workflowName, result, executionTime }); // Store successful workflow execution in memory await this.memory.storeWorkflowMemory('workflow_execution', { workflowId, workflowName, params, result, executionTime, success: true, timestamp: new Date() }); return result; } catch (error) { const executionTime = Date.now() - startTime; this.updateMetrics(false, executionTime); this.removeActiveWorkflow(workflowId); this.emit('workflow:error', { id: workflowId, name: workflowName, error, executionTime }); // Store failed workflow execution in memory for learning await this.memory.storeWorkflowMemory('workflow_execution', { workflowId, workflowName, params, error: error.message, executionTime, success: false, timestamp: new Date() }); throw error; } } /** * Execute auto-release workflow */ private async executeAutoRelease(params: any): Promise<any> { this.emit('workflow:step', { step: 'auto-release', status: 'starting' }); // Check if release should be triggered const shouldRelease = await this.shouldTriggerRelease(params); if (!shouldRelease) { return { triggered: false, reason: 'Release conditions not met' }; } // Run pre-release quality checks this.emit('workflow:step', { step: 'quality-checks', status: 'running' }); const qualityResults = await this.lintingManager.runFullQualitySuite(); if (!qualityResults.passed) { throw new Error(`Quality checks failed: ${qualityResults.issues.join(', ')}`); } // Create release PR this.emit('workflow:step', { step: 'create-pr', status: 'running' }); const releasePR = await this.gitHubManager.createReleasePR(); this.emit('workflow:step', { step: 'auto-release', status: 'completed' }); return { triggered: true, releasePR, qualityResults }; } /** * Execute issue processing workflow */ private async executeProcessIssue(params: { issueNumber: number }): Promise<any> { this.emit('workflow:step', { step: 'process-issue', status: 'starting' }); // Classify and label the issue const classification = await this.gitHubManager.processIssue(params.issueNumber); // Generate recommendations based on classification const recommendations = await this.generateIssueRecommendations(classification); // Check for potential automation opportunities const automationSuggestions = await this.identifyAutomationOpportunities(classification); this.emit('workflow:step', { step: 'process-issue', status: 'completed' }); return { classification, recommendations, automationSuggestions }; } /** * Execute feedback collection workflow */ private async executeCollectFeedback(params: any): Promise<any> { this.emit('workflow:step', { step: 'collect-feedback', status: 'starting' }); // Collect feedback from all configured channels const feedbackItems = await this.gitHubManager.collectFeedback(); // Process and analyze feedback const analysis = await this.feedbackProcessor.processFeedback(feedbackItems); // Generate action items const actionItems = await this.generateFeedbackActionItems(analysis); this.emit('workflow:step', { step: 'collect-feedback', status: 'completed' }); return { feedbackItems, analysis, actionItems }; } /** * Execute quality check workflow */ private async executeQualityCheck(params: { prNumber?: number }): Promise<any> { this.emit('workflow:step', { step: 'quality-check', status: 'starting' }); // Run comprehensive quality checks const results = await this.gitHubManager.runQualityChecks(params.prNumber); // Generate improvement suggestions const suggestions = await this.generateQualityImprovements(results); this.emit('workflow:step', { step: 'quality-check', status: 'completed' }); return { ...results, suggestions }; } /** * Execute full pipeline workflow */ private async executeFullPipeline(params: any): Promise<any> { this.emit('workflow:step', { step: 'full-pipeline', status: 'starting' }); const results: Record<string, any> = {}; // Step 1: Collect and process feedback try { results.feedback = await this.executeCollectFeedback({}); } catch (error) { console.warn('Feedback collection failed:', error); results.feedback = { error: error.message }; } // Step 2: Run quality checks try { results.quality = await this.executeQualityCheck({}); } catch (error) { console.warn('Quality checks failed:', error); results.quality = { error: error.message }; } // Step 3: Check for auto-release try { results.release = await this.executeAutoRelease({}); } catch (error) { console.warn('Auto-release failed:', error); results.release = { error: error.message }; } // Step 4: Generate summary and recommendations results.summary = await this.generatePipelineSummary(results); this.emit('workflow:step', { step: 'full-pipeline', status: 'completed' }); return results; } /** * Get current workflow status */ getStatus(): WorkflowStatus { return { ...this.status }; } /** * Get workflow metrics */ getMetrics(): WorkflowMetrics { return { ...this.metrics }; } /** * Get workflow history from memory */ async getWorkflowHistory(limit = 10): Promise<any[]> { return await this.memory.getWorkflowHistory('workflow_execution', limit); } /** * Handle webhook events */ async handleWebhook(event: string, payload: any): Promise<void> { try { this.emit('webhook', { event, payload }); // Delegate to appropriate manager await this.gitHubManager.handleWebhook(event, payload); // Check if this should trigger any automated workflows await this.checkAutomationTriggers(event, payload); } catch (error) { this.emit('error', { type: 'webhook', event, error }); } } /** * Shutdown the orchestrator gracefully */ async shutdown(): Promise<void> { this.emit('status', { type: 'shutdown', message: 'Shutting down workflow orchestrator' }); // Stop health monitoring if (this.healthCheckInterval) { clearInterval(this.healthCheckInterval); } // Wait for active workflows to complete or timeout await this.waitForActiveWorkflows(30000); // 30 seconds timeout // Shutdown memory system await this.memory.shutdown(); this.status.state = 'stopped'; this.emit('status', { type: 'stopped', message: 'Workflow orchestrator stopped' }); } // Private methods private setupEventListeners(): void { // Set up cross-component event handling this.on('workflow:error', this.handleWorkflowError.bind(this)); this.on('memory:error', this.handleMemoryError.bind(this)); } private startHealthMonitoring(): void { this.healthCheckInterval = setInterval(async () => { await this.performHealthCheck(); }, 60000); // Check every minute } private async performHealthCheck(): Promise<void> { try { // Check memory system health const memoryHealth = await this.memory.getHealthStatus(); // Check workflow manager health const workflowHealth = this.checkWorkflowHealth(); // Update overall health status this.status.health = (memoryHealth.healthy && workflowHealth) ? 'healthy' : 'unhealthy'; this.status.lastUpdate = new Date(); this.emit('health:check', { memory: memoryHealth, workflows: workflowHealth, overall: this.status.health }); } catch (error) { this.status.health = 'unhealthy'; this.emit('error', { type: 'health-check', error }); } } private checkWorkflowHealth(): boolean { // Check if we have too many active workflows if (this.status.activeWorkflows.length > 10) { return false; } // Check if any workflows have been running too long const now = Date.now(); const staleWorkflows = this.status.activeWorkflows.filter(w => now - w.startTime.getTime() > 300000 // 5 minutes ); return staleWorkflows.length === 0; } private generateWorkflowId(): string { return `wf_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`; } private updateMetrics(success: boolean, executionTime: number): void { this.metrics.totalWorkflows++; if (success) { this.metrics.successfulWorkflows++; } else { this.metrics.failedWorkflows++; } // Update average execution time const totalTime = this.metrics.averageExecutionTime * (this.metrics.totalWorkflows - 1) + executionTime; this.metrics.averageExecutionTime = totalTime / this.metrics.totalWorkflows; // Update memory usage (approximation) this.metrics.memoryUsage = process.memoryUsage().heapUsed; this.metrics.lastMetricsUpdate = new Date(); } private removeActiveWorkflow(workflowId: string): void { this.status.activeWorkflows = this.status.activeWorkflows.filter(w => w.id !== workflowId); } private async shouldTriggerRelease(params: any): Promise<boolean> { // Check configuration-based conditions const config = this.config.features.autoRelease.triggerConditions; // Check commit count condition if (config.commitCount > 0) { // Implementation would check actual commit count return true; // Placeholder } // Check time-based conditions if (config.timeThreshold) { // Implementation would check time since last release return true; // Placeholder } // Check feature flags if (config.featureFlags.length > 0) { // Implementation would check feature completion return true; // Placeholder } return false; } private async generateIssueRecommendations(classification: any): Promise<string[]> { const recommendations: string[] = []; if (classification.priority === 'critical') { recommendations.push('Assign to senior developer immediately'); recommendations.push('Set up monitoring for this issue type'); } if (classification.complexity === 'complex') { recommendations.push('Break down into smaller tasks'); recommendations.push('Consider architectural review'); } if (classification.estimatedHours > 20) { recommendations.push('Consider adding to next sprint planning'); recommendations.push('May require dedicated milestone'); } return recommendations; } private async identifyAutomationOpportunities(classification: any): Promise<string[]> { const opportunities: string[] = []; if (classification.type === 'bug' && classification.component.includes('testing')) { opportunities.push('Add automated test case to prevent regression'); } if (classification.type === 'documentation') { opportunities.push('Consider auto-generating docs from code comments'); } if (classification.component.includes('api')) { opportunities.push('Add API monitoring and alerts'); } return opportunities; } private async generateFeedbackActionItems(analysis: any): Promise<string[]> { const actionItems: string[] = []; // Analyze high-priority negative feedback const criticalFeedback = analysis.items?.filter((item: any) => item.sentiment === 'negative' && item.priority > 7 ) || []; for (const item of criticalFeedback) { actionItems.push(`Address critical feedback: ${item.topics.join(', ')}`); } // Look for common themes const commonTopics = analysis.commonTopics || []; for (const topic of commonTopics) { if (topic.frequency > 3) { actionItems.push(`Investigate recurring topic: ${topic.name}`); } } return actionItems; } private async generateQualityImprovements(results: any): Promise<string[]> { const improvements: string[] = []; if (results.results.coverage?.percentage < 80) { improvements.push('Increase test coverage to at least 80%'); } if (results.results.security?.vulnerabilities > 0) { improvements.push('Address security vulnerabilities before release'); } if (results.results.performance?.issues > 0) { improvements.push('Optimize performance bottlenecks'); } return improvements; } private async generatePipelineSummary(results: any): Promise<any> { const summary = { timestamp: new Date(), status: 'completed', steps: Object.keys(results), successfulSteps: Object.keys(results).filter(key => !results[key].error), failedSteps: Object.keys(results).filter(key => results[key].error), recommendations: [] as string[] }; // Generate recommendations based on results if (results.quality && !results.quality.error && !results.quality.passed) { summary.recommendations.push('Fix quality issues before next release'); } if (results.feedback && !results.feedback.error) { summary.recommendations.push('Review and address user feedback'); } if (results.release?.triggered) { summary.recommendations.push('Monitor release PR for approvals'); } return summary; } private async checkAutomationTriggers(event: string, payload: any): Promise<void> { // Check if this event should trigger any automated workflows const triggers = this.config.integrations.github.webhooks.events; if (triggers.includes(event)) { switch (event) { case 'push': if (payload.ref === `refs/heads/${this.config.repository.branch}`) { await this.executeWorkflow('quality-check', { prNumber: null }); } break; case 'pull_request.opened': await this.executeWorkflow('quality-check', { prNumber: payload.pull_request.number }); break; case 'issues.opened': await this.executeWorkflow('process-issue', { issueNumber: payload.issue.number }); break; } } } private async waitForActiveWorkflows(timeout: number): Promise<void> { const startTime = Date.now(); while (this.status.activeWorkflows.length > 0 && (Date.now() - startTime) < timeout) { await new Promise(resolve => setTimeout(resolve, 1000)); } } private handleWorkflowError(event: WorkflowEvent): void { console.error(`Workflow error in ${event.name}:`, event.error); // Implement error recovery strategies this.attemptErrorRecovery(event); } private handleMemoryError(error: any): void { console.error('Memory system error:', error); // Implement memory recovery strategies this.attemptMemoryRecovery(error); } private async attemptErrorRecovery(event: WorkflowEvent): Promise<void> { // Implement automatic error recovery strategies console.log(`Attempting recovery for workflow ${event.name}`); } private async attemptMemoryRecovery(error: any): Promise<void> { // Implement memory system recovery strategies console.log('Attempting memory system recovery'); } }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/henryhawke/mcp-titan'

If you have feedback or need assistance with the MCP directory API, please join our Discord server