Skip to main content
Glama
workflow-aware-agent-manager.ts41.6 kB
/** * Workflow-Aware Agent Lifecycle Manager * * Prevents agents from being marked offline during active decomposition and orchestration * processes by implementing workflow-aware heartbeat management and adaptive timeouts. */ import { EventEmitter } from 'events'; import { AgentOrchestrator } from './agent-orchestrator.js'; import { WorkflowStateManager, WorkflowPhase } from './workflow-state-manager.js'; import { DecompositionService } from './decomposition-service.js'; // Timeout manager not used in this implementation import { createErrorContext, ValidationError } from '../utils/enhanced-errors.js'; import { InitializationMonitor } from '../../../utils/initialization-monitor.js'; import logger from '../../../logger.js'; /** * Agent activity types that require extended heartbeat tolerance */ export type AgentActivity = | 'idle' | 'decomposition' | 'orchestration' | 'task_execution' | 'research' | 'context_enrichment' | 'dependency_analysis'; /** * Agent lifecycle state with workflow awareness */ export interface WorkflowAwareAgentState { agentId: string; currentActivity: AgentActivity; activityStartTime: Date; lastHeartbeat: Date; lastProgressUpdate: Date; workflowId?: string; sessionId?: string; expectedDuration?: number; // Expected activity duration in ms progressPercentage: number; isWorkflowCritical: boolean; // True if agent is critical for current workflow extendedTimeoutUntil?: Date; // Extended timeout deadline gracePeriodCount: number; // Number of grace periods used metadata: { workflowPhase?: WorkflowPhase; taskCount?: number; estimatedCompletion?: Date; lastActivityUpdate?: Date; [key: string]: unknown; // Allow additional metadata fields }; } /** * Workflow-aware timeout configuration */ export interface WorkflowTimeoutConfig { baseHeartbeatInterval: number; // Base heartbeat interval (30s) activityTimeoutMultipliers: Record<AgentActivity, number>; // Multipliers per activity maxGracePeriods: number; // Maximum grace periods before marking offline gracePeriodDuration: number; // Duration of each grace period progressUpdateInterval: number; // Required progress update interval workflowCriticalExtension: number; // Extra time for workflow-critical agents enableAdaptiveTimeouts: boolean; // Enable progress-based timeout adjustment } /** * Default workflow timeout configuration */ const DEFAULT_WORKFLOW_TIMEOUT_CONFIG: WorkflowTimeoutConfig = { baseHeartbeatInterval: 30000, // 30 seconds activityTimeoutMultipliers: { idle: 2, // 60 seconds for idle agents decomposition: 20, // 10 minutes for decomposition orchestration: 10, // 5 minutes for orchestration task_execution: 6, // 3 minutes for task execution research: 15, // 7.5 minutes for research context_enrichment: 8, // 4 minutes for context enrichment dependency_analysis: 12 // 6 minutes for dependency analysis }, maxGracePeriods: 3, gracePeriodDuration: 60000, // 1 minute grace periods progressUpdateInterval: 120000, // 2 minutes between progress updates workflowCriticalExtension: 300000, // 5 minutes extra for critical agents enableAdaptiveTimeouts: true }; /** * Workflow-aware agent lifecycle manager */ export class WorkflowAwareAgentManager extends EventEmitter { private static instance: WorkflowAwareAgentManager | null = null; private config: WorkflowTimeoutConfig; private agentStates = new Map<string, WorkflowAwareAgentState>(); private agentOrchestrator: AgentOrchestrator | null = null; private workflowStateManager: WorkflowStateManager | null = null; private decompositionService: DecompositionService | null = null; private monitoringInterval: NodeJS.Timeout | null = null; private isMonitoring = false; private startTime = Date.now(); private constructor(config: Partial<WorkflowTimeoutConfig> = {}) { super(); this.config = { ...DEFAULT_WORKFLOW_TIMEOUT_CONFIG, ...config }; // AgentOrchestrator will be initialized lazily to prevent circular dependency // Initialize workflow state manager and decomposition service with proper getInstance calls try { this.workflowStateManager = WorkflowStateManager.getInstance(); } catch (error) { logger.warn({ err: error }, 'WorkflowStateManager getInstance not available, using null fallback'); this.workflowStateManager = null; } // Initialize decomposition service with config (following TaskRefinementService pattern) // Use async initialization pattern to prevent timing issues this.scheduleAsyncInitialization(); logger.info('Workflow-aware agent manager initialized', { config: this.config }); } /** * Schedule async initialization to prevent timing issues */ private scheduleAsyncInitialization(): void { process.nextTick(() => { this.initializeDecompositionService().then(() => { this.setupEventListeners(); }).catch(error => { logger.warn({ err: error }, 'DecompositionService initialization failed, setting up event listeners with fallback'); this.setupEventListeners(); // Still setup with null service }); }); } /** * Initialize decomposition service with config */ private async initializeDecompositionService(): Promise<void> { try { const { getVibeTaskManagerConfig } = await import('../utils/config-loader.js'); const config = await getVibeTaskManagerConfig(); if (!config) { throw new Error('Failed to load task manager configuration'); } // Convert LLMConfig to OpenRouterConfig format const openRouterConfig = { baseUrl: 'https://openrouter.ai/api/v1', apiKey: process.env.OPENROUTER_API_KEY || '', model: 'anthropic/claude-3-sonnet', geminiModel: 'gemini-pro', perplexityModel: 'llama-3.1-sonar-small-128k-online' }; this.decompositionService = DecompositionService.getInstance(openRouterConfig); } catch (error) { logger.warn({ err: error }, 'DecompositionService initialization failed, using fallback'); this.decompositionService = null; } } /** * Lazily initialize AgentOrchestrator to prevent circular dependency */ private async getAgentOrchestrator(): Promise<AgentOrchestrator> { if (!this.agentOrchestrator) { // Use dynamic import to break circular dependency const { AgentOrchestrator } = await import('./agent-orchestrator.js'); this.agentOrchestrator = AgentOrchestrator.getInstance(); } return this.agentOrchestrator; } /** * Get singleton instance */ static getInstance(config?: Partial<WorkflowTimeoutConfig>): WorkflowAwareAgentManager { if (!WorkflowAwareAgentManager.instance) { const monitor = InitializationMonitor.getInstance(); monitor.startServiceInitialization('WorkflowAwareAgentManager', [ 'WorkflowStateManager', 'DecompositionService' ], { config }); try { monitor.startPhase('WorkflowAwareAgentManager', 'constructor'); WorkflowAwareAgentManager.instance = new WorkflowAwareAgentManager(config); monitor.endPhase('WorkflowAwareAgentManager', 'constructor'); monitor.endServiceInitialization('WorkflowAwareAgentManager'); } catch (error) { monitor.endPhase('WorkflowAwareAgentManager', 'constructor', error as Error); monitor.endServiceInitialization('WorkflowAwareAgentManager', error as Error); throw error; } } return WorkflowAwareAgentManager.instance; } /** * Start workflow-aware monitoring */ async startMonitoring(): Promise<void> { if (this.isMonitoring) { logger.warn('Workflow-aware agent monitoring already active'); return; } try { this.isMonitoring = true; this.startTime = Date.now(); // Start monitoring interval this.monitoringInterval = setInterval(() => { this.performWorkflowAwareHealthCheck().catch(error => { logger.error({ err: error }, 'Error in workflow-aware health check'); }); }, this.config.baseHeartbeatInterval); logger.info('Workflow-aware agent monitoring started', { interval: this.config.baseHeartbeatInterval, enableAdaptiveTimeouts: this.config.enableAdaptiveTimeouts }); } catch (error) { this.isMonitoring = false; const context = createErrorContext('WorkflowAwareAgentManager', 'startMonitoring') .metadata({ config: this.config }) .build(); logger.error({ err: error, context }, 'Failed to start workflow-aware monitoring'); throw new ValidationError('Failed to start workflow-aware monitoring', context); } } /** * Stop monitoring */ async stopMonitoring(): Promise<void> { if (!this.isMonitoring) { return; } try { this.isMonitoring = false; if (this.monitoringInterval) { clearInterval(this.monitoringInterval); this.monitoringInterval = null; } logger.info('Workflow-aware agent monitoring stopped'); } catch (error) { logger.error({ err: error }, 'Error stopping workflow-aware monitoring'); } } /** * Register agent activity */ async registerAgentActivity( agentId: string, activity: AgentActivity, options: { workflowId?: string; sessionId?: string; expectedDuration?: number; isWorkflowCritical?: boolean; metadata?: Record<string, unknown>; } = {} ): Promise<void> { const now = new Date(); const agentState: WorkflowAwareAgentState = { agentId, currentActivity: activity, activityStartTime: now, lastHeartbeat: now, lastProgressUpdate: now, workflowId: options.workflowId, sessionId: options.sessionId, expectedDuration: options.expectedDuration, progressPercentage: 0, isWorkflowCritical: options.isWorkflowCritical || false, gracePeriodCount: 0, metadata: { ...options.metadata, lastActivityUpdate: now } }; // Calculate extended timeout if needed if (activity !== 'idle' && options.isWorkflowCritical) { const baseTimeout = this.calculateActivityTimeout(activity); agentState.extendedTimeoutUntil = new Date(now.getTime() + baseTimeout + this.config.workflowCriticalExtension); } this.agentStates.set(agentId, agentState); logger.info({ agentId, activity, workflowId: options.workflowId, sessionId: options.sessionId, isWorkflowCritical: options.isWorkflowCritical, extendedTimeoutUntil: agentState.extendedTimeoutUntil }, 'Agent activity registered'); // Emit activity change event this.emit('agent_activity_changed', { agentId, activity, timestamp: now, metadata: agentState.metadata }); } /** * Update agent progress */ async updateAgentProgress( agentId: string, progressPercentage: number, metadata?: Record<string, unknown> ): Promise<void> { const agentState = this.agentStates.get(agentId); if (!agentState) { logger.warn({ agentId }, 'Cannot update progress for unregistered agent'); return; } const now = new Date(); agentState.progressPercentage = Math.max(0, Math.min(100, progressPercentage)); agentState.lastProgressUpdate = now; agentState.lastHeartbeat = now; // Progress update counts as heartbeat if (metadata) { agentState.metadata = { ...agentState.metadata, ...metadata, lastActivityUpdate: now }; } // Reset grace period count on progress update agentState.gracePeriodCount = 0; // Adjust timeout based on progress if adaptive timeouts are enabled if (this.config.enableAdaptiveTimeouts && agentState.expectedDuration) { this.adjustTimeoutBasedOnProgress(agentState); } logger.debug({ agentId, progressPercentage, activity: agentState.currentActivity, workflowId: agentState.workflowId }, 'Agent progress updated'); // Emit progress update event this.emit('agent_progress_updated', { agentId, progressPercentage, activity: agentState.currentActivity, timestamp: now, metadata: agentState.metadata }); // Update orchestrator heartbeat this.getAgentOrchestrator().then(orchestrator => { if (orchestrator && typeof orchestrator.updateAgentHeartbeat === 'function') { orchestrator.updateAgentHeartbeat(agentId, 'available'); } }).catch(error => { logger.warn({ err: error, agentId }, 'Failed to update agent heartbeat'); }); } /** * Complete agent activity */ async completeAgentActivity( agentId: string, success: boolean = true, metadata?: Record<string, unknown> ): Promise<void> { const agentState = this.agentStates.get(agentId); if (!agentState) { logger.warn({ agentId }, 'Cannot complete activity for unregistered agent'); return; } const now = new Date(); const duration = now.getTime() - agentState.activityStartTime.getTime(); logger.info({ agentId, activity: agentState.currentActivity, duration: Math.round(duration / 1000), success, workflowId: agentState.workflowId }, 'Agent activity completed'); // Emit activity completion event this.emit('agent_activity_completed', { agentId, activity: agentState.currentActivity, duration, success, timestamp: now, metadata: { ...agentState.metadata, ...metadata } }); // Reset to idle activity await this.registerAgentActivity(agentId, 'idle', { workflowId: agentState.workflowId, sessionId: agentState.sessionId }); } /** * Get agent state */ getAgentState(agentId: string): WorkflowAwareAgentState | undefined { return this.agentStates.get(agentId); } /** * Get all agent states */ getAllAgentStates(): WorkflowAwareAgentState[] { return Array.from(this.agentStates.values()); } /** * Get workflow-aware statistics */ getWorkflowAwareStats(): { totalAgents: number; activeWorkflows: number; agentsByActivity: Record<AgentActivity, number>; criticalAgents: number; agentsInGracePeriod: number; averageProgress: number; } { const states = Array.from(this.agentStates.values()); const agentsByActivity: Record<AgentActivity, number> = { idle: 0, decomposition: 0, orchestration: 0, task_execution: 0, research: 0, context_enrichment: 0, dependency_analysis: 0 }; let criticalAgents = 0; let agentsInGracePeriod = 0; let totalProgress = 0; const activeWorkflows = new Set<string>(); for (const state of states) { agentsByActivity[state.currentActivity]++; if (state.isWorkflowCritical) criticalAgents++; if (state.gracePeriodCount > 0) agentsInGracePeriod++; if (state.workflowId) activeWorkflows.add(state.workflowId); totalProgress += state.progressPercentage; } return { totalAgents: states.length, activeWorkflows: activeWorkflows.size, agentsByActivity, criticalAgents, agentsInGracePeriod, averageProgress: states.length > 0 ? totalProgress / states.length : 0 }; } /** * Setup event listeners for workflow and decomposition events */ private setupEventListeners(): void { // Listen to workflow state changes (with fallback for services that don't support events) try { const workflowStateManagerAny = this.workflowStateManager as EventEmitter; if (typeof workflowStateManagerAny.on === 'function') { workflowStateManagerAny.on('workflow_phase_changed', (data: Record<string, unknown>) => { this.handleWorkflowPhaseChange(data).catch(error => { logger.error({ err: error, data }, 'Error handling workflow phase change'); }); }); workflowStateManagerAny.on('workflow_progress_updated', (data: Record<string, unknown>) => { this.handleWorkflowProgressUpdate(data).catch(error => { logger.error({ err: error, data }, 'Error handling workflow progress update'); }); }); } else { logger.debug('WorkflowStateManager does not support event listeners, using fallback mode'); } } catch (error) { logger.warn({ err: error }, 'Failed to setup workflow state manager event listeners'); } // Listen to decomposition events (with fallback for services that don't support events) try { const decompositionServiceAny = this.decompositionService as EventEmitter; if (decompositionServiceAny && typeof decompositionServiceAny.on === 'function') { decompositionServiceAny.on('decomposition_started', (data: Record<string, unknown>) => { this.handleDecompositionStarted(data).catch(error => { logger.error({ err: error, data }, 'Error handling decomposition started'); }); }); decompositionServiceAny.on('decomposition_progress', (data: Record<string, unknown>) => { this.handleDecompositionProgress(data).catch(error => { logger.error({ err: error, data }, 'Error handling decomposition progress'); }); }); decompositionServiceAny.on('decomposition_completed', (data: Record<string, unknown>) => { this.handleDecompositionCompleted(data).catch(error => { logger.error({ err: error, data }, 'Error handling decomposition completed'); }); }); decompositionServiceAny.on('decomposition_failed', (data: Record<string, unknown>) => { this.handleDecompositionFailed(data).catch(error => { logger.error({ err: error, data }, 'Error handling decomposition failed'); }); }); decompositionServiceAny.on('epic_generation_started', (data: Record<string, unknown>) => { this.handleEpicGenerationStarted(data).catch(error => { logger.error({ err: error, data }, 'Error handling epic generation started'); }); }); decompositionServiceAny.on('epic_generation_completed', (data: Record<string, unknown>) => { this.handleEpicGenerationCompleted(data).catch(error => { logger.error({ err: error, data }, 'Error handling epic generation completed'); }); }); decompositionServiceAny.on('task_list_started', (data: Record<string, unknown>) => { this.handleTaskListStarted(data).catch(error => { logger.error({ err: error, data }, 'Error handling task list started'); }); }); decompositionServiceAny.on('task_list_completed', (data: Record<string, unknown>) => { this.handleTaskListCompleted(data).catch(error => { logger.error({ err: error, data }, 'Error handling task list completed'); }); }); } else { logger.debug('DecompositionService does not support event listeners, using fallback mode'); } } catch (error) { logger.warn({ err: error }, 'Failed to setup decomposition service event listeners'); } logger.debug('Event listeners setup for workflow-aware agent management'); } /** * Perform workflow-aware health check */ private async performWorkflowAwareHealthCheck(): Promise<void> { const now = new Date(); for (const [agentId, agentState] of this.agentStates.entries()) { try { const shouldMarkOffline = await this.shouldMarkAgentOffline(agentState, now); if (shouldMarkOffline) { await this.handleAgentTimeout(agentState, now); } else { // Check if agent needs progress update reminder const timeSinceProgress = now.getTime() - agentState.lastProgressUpdate.getTime(); if (timeSinceProgress > this.config.progressUpdateInterval && agentState.currentActivity !== 'idle') { this.emit('agent_progress_reminder', { agentId, activity: agentState.currentActivity, timeSinceProgress, timestamp: now }); } } } catch (error) { logger.error({ err: error, agentId }, 'Error in workflow-aware health check for agent'); } } } /** * Determine if agent should be marked offline */ private async shouldMarkAgentOffline(agentState: WorkflowAwareAgentState, now: Date): Promise<boolean> { const timeSinceHeartbeat = now.getTime() - agentState.lastHeartbeat.getTime(); const activityTimeout = this.calculateActivityTimeout(agentState.currentActivity); // Check if we're within extended timeout period if (agentState.extendedTimeoutUntil && now < agentState.extendedTimeoutUntil) { return false; } // Check if we're within grace period if (agentState.gracePeriodCount < this.config.maxGracePeriods) { if (timeSinceHeartbeat > activityTimeout) { // Enter grace period agentState.gracePeriodCount++; const gracePeriodEnd = new Date(now.getTime() + this.config.gracePeriodDuration); logger.warn({ agentId: agentState.agentId, activity: agentState.currentActivity, gracePeriod: agentState.gracePeriodCount, maxGracePeriods: this.config.maxGracePeriods, gracePeriodEnd }, 'Agent entered grace period'); this.emit('agent_grace_period', { agentId: agentState.agentId, gracePeriod: agentState.gracePeriodCount, gracePeriodEnd, timestamp: now }); return false; // Don't mark offline yet } } // Mark offline if exceeded all grace periods return timeSinceHeartbeat > activityTimeout + (this.config.gracePeriodDuration * this.config.maxGracePeriods); } /** * Calculate timeout for specific activity */ private calculateActivityTimeout(activity: AgentActivity): number { const multiplier = this.config.activityTimeoutMultipliers[activity] || 2; return this.config.baseHeartbeatInterval * multiplier; } /** * Adjust timeout based on progress */ private adjustTimeoutBasedOnProgress(agentState: WorkflowAwareAgentState): void { if (!agentState.expectedDuration || agentState.progressPercentage === 0) { return; } const progressRatio = agentState.progressPercentage / 100; const elapsedTime = Date.now() - agentState.activityStartTime.getTime(); const estimatedTotalTime = elapsedTime / progressRatio; const estimatedRemainingTime = estimatedTotalTime - elapsedTime; // Extend timeout if we have good progress and need more time if (progressRatio > 0.1 && estimatedRemainingTime > 0) { const bufferTime = estimatedRemainingTime * 0.5; // 50% buffer agentState.extendedTimeoutUntil = new Date(Date.now() + estimatedRemainingTime + bufferTime); logger.debug({ agentId: agentState.agentId, progressRatio, estimatedRemainingTime, extendedTimeoutUntil: agentState.extendedTimeoutUntil }, 'Adjusted timeout based on progress'); } } /** * Handle agent timeout */ private async handleAgentTimeout(agentState: WorkflowAwareAgentState, now: Date): Promise<void> { logger.warn({ agentId: agentState.agentId, activity: agentState.currentActivity, workflowId: agentState.workflowId, gracePeriodCount: agentState.gracePeriodCount, isWorkflowCritical: agentState.isWorkflowCritical }, 'Agent timeout detected - marking offline'); // Emit timeout event this.emit('agent_timeout', { agentId: agentState.agentId, activity: agentState.currentActivity, workflowId: agentState.workflowId, gracePeriodCount: agentState.gracePeriodCount, timestamp: now }); // Mark agent as offline in orchestrator this.getAgentOrchestrator().then(orchestrator => { if (orchestrator && typeof orchestrator.updateAgentHeartbeat === 'function') { orchestrator.updateAgentHeartbeat(agentState.agentId, 'offline'); } }).catch(error => { logger.warn({ err: error, agentId: agentState.agentId }, 'Failed to update agent heartbeat to offline'); }); // Remove from our tracking this.agentStates.delete(agentState.agentId); } /** * Handle workflow phase change */ private async handleWorkflowPhaseChange(data: Record<string, unknown>): Promise<void> { const { workflowId, sessionId, fromPhase, toPhase, agentId } = data; if (!agentId || typeof agentId !== 'string') return; const agentState = this.agentStates.get(agentId); if (!agentState) return; // Update agent activity based on workflow phase let newActivity: AgentActivity = 'idle'; let isWorkflowCritical = false; switch (toPhase) { case WorkflowPhase.DECOMPOSITION: newActivity = 'decomposition'; isWorkflowCritical = true; break; case WorkflowPhase.ORCHESTRATION: newActivity = 'orchestration'; isWorkflowCritical = true; break; case WorkflowPhase.EXECUTION: newActivity = 'task_execution'; isWorkflowCritical = false; break; default: newActivity = 'idle'; isWorkflowCritical = false; } await this.registerAgentActivity(agentId, newActivity, { workflowId: workflowId as string, sessionId: sessionId as string, isWorkflowCritical, metadata: { workflowPhase: toPhase, previousPhase: fromPhase } }); } /** * Handle workflow progress update */ private async handleWorkflowProgressUpdate(data: Record<string, unknown>): Promise<void> { const { workflowId, sessionId, progress, agentId } = data; if (!agentId || typeof agentId !== 'string' || typeof progress !== 'number') return; await this.updateAgentProgress(agentId, progress, { workflowId: workflowId as string, sessionId: sessionId as string, lastWorkflowUpdate: new Date() }); } /** * Handle decomposition started */ private async handleDecompositionStarted(data: Record<string, unknown>): Promise<void> { const { sessionId, agentId, taskId, projectId, originalSessionId, jobId } = data; if (!agentId || typeof agentId !== 'string') return; await this.registerAgentActivity(agentId, 'decomposition', { sessionId: sessionId as string, workflowId: sessionId as string, // Use sessionId as workflowId for decomposition isWorkflowCritical: true, expectedDuration: 10 * 60 * 1000, // 10 minutes expected metadata: { taskId, projectId, decompositionStarted: new Date() } }); // NEW: Bridge to stdio communication for decomposition started if (originalSessionId && jobId) { const bridgeStartTime = Date.now(); try { const { sseNotifier } = await import('../../../services/sse-notifier/index.js'); const { JobStatus } = await import('../../../services/job-manager/index.js'); sseNotifier.sendProgress( originalSessionId as string, jobId as string, JobStatus.RUNNING, 'Decomposition started - analyzing task complexity', 0 ); const bridgeExecutionTime = Date.now() - bridgeStartTime; logger.info({ originalSessionId, jobId, step: 'decomposition_started', bridgeExecutionTime, performance: { bridgeLatency: bridgeExecutionTime, timestamp: new Date().toISOString() } }, 'Progress bridge: decomposition started forwarded to stdio client'); } catch (error) { const bridgeExecutionTime = Date.now() - bridgeStartTime; logger.error({ err: error, originalSessionId, jobId, bridgeExecutionTime, errorType: error instanceof Error ? error.constructor.name : 'Unknown', errorMessage: error instanceof Error ? error.message : String(error) }, 'Progress bridge: failed to send decomposition started to stdio client'); } } } /** * Handle decomposition progress */ private async handleDecompositionProgress(data: Record<string, unknown>): Promise<void> { const { sessionId, agentId, progress, originalSessionId, jobId, step } = data; if (!agentId || typeof agentId !== 'string' || typeof progress !== 'number') return; await this.updateAgentProgress(agentId, progress, { sessionId: sessionId as string, lastDecompositionUpdate: new Date() }); // NEW: Bridge to stdio communication for decomposition progress if (originalSessionId && jobId) { const bridgeStartTime = Date.now(); try { const { sseNotifier } = await import('../../../services/sse-notifier/index.js'); const { JobStatus } = await import('../../../services/job-manager/index.js'); // Create descriptive progress message based on step let progressMessage = `Decomposition ${progress}% complete`; const metadata = data.metadata as Record<string, unknown> || {}; if (step) { switch (step as string) { case 'decomposition_started': progressMessage = 'Decomposition started - analyzing task complexity'; break; case 'context_enrichment_completed': progressMessage = 'Context analysis complete - beginning decomposition'; break; case 'epic_generation_started': progressMessage = `Epic identification started - analyzing ${metadata.taskCount || 0} tasks`; break; case 'epic_generation_completed': progressMessage = `Epic identification completed - functional areas identified`; break; case 'task_persistence_started': progressMessage = `Task persistence started - saving ${metadata.taskCount || 0} tasks`; break; case 'task_persisted': progressMessage = metadata.message as string || `Task ${metadata.persistedCount}/${metadata.totalTasks} persisted`; break; case 'dependency_analysis_started': progressMessage = `Dependency analysis started - mapping task relationships`; break; case 'dependency_analysis_completed': progressMessage = 'Dependency analysis completed - task graph generated'; break; case 'decomposition_completed': progressMessage = `Decomposition completed - ${metadata.persistedTasks || 0} tasks ready`; break; case 'task_processing': progressMessage = `Processing ${metadata.currentTaskTitle || 'task'} - ${metadata.processedTasks}/${metadata.totalTasks}`; break; default: progressMessage = `Decomposition ${progress}% complete - ${step}`; } } sseNotifier.sendProgress( originalSessionId as string, jobId as string, JobStatus.RUNNING, progressMessage, progress as number ); const bridgeExecutionTime = Date.now() - bridgeStartTime; logger.info({ originalSessionId, jobId, progress, step, progressMessage, bridgeExecutionTime, performance: { bridgeLatency: bridgeExecutionTime, progressValue: progress, timestamp: new Date().toISOString() } }, 'Progress bridge: decomposition progress forwarded to stdio client'); } catch (error) { const bridgeExecutionTime = Date.now() - bridgeStartTime; logger.error({ err: error, originalSessionId, jobId, progress, step, bridgeExecutionTime, errorType: error instanceof Error ? error.constructor.name : 'Unknown', errorMessage: error instanceof Error ? error.message : String(error) }, 'Progress bridge: failed to send decomposition progress to stdio client'); } } } /** * Handle decomposition completed */ private async handleDecompositionCompleted(data: Record<string, unknown>): Promise<void> { const { sessionId, agentId, status, originalSessionId, jobId, totalTasks, persistedTasks } = data; // Extract success from status field (since decomposition events use 'status' not 'success') const success = status === 'completed'; if (!agentId || typeof agentId !== 'string') return; await this.completeAgentActivity(agentId, success as boolean, { sessionId: sessionId as string, decompositionCompleted: new Date() }); // NEW: Bridge to stdio communication for decomposition completed if (originalSessionId && jobId) { try { const { sseNotifier } = await import('../../../services/sse-notifier/index.js'); const { JobStatus } = await import('../../../services/job-manager/index.js'); // Create completion message with task count information const taskCount = persistedTasks || totalTasks || 0; const completionMessage = success ? `Decomposition completed successfully - ${taskCount} tasks generated` : 'Decomposition completed with issues - check results for details'; sseNotifier.sendProgress( originalSessionId as string, jobId as string, success ? JobStatus.COMPLETED : JobStatus.FAILED, completionMessage, 100 ); logger.debug({ originalSessionId, jobId, success, taskCount, completionMessage }, 'Progress bridge: forwarded decomposition completion to stdio client'); } catch (error) { logger.warn({ err: error, originalSessionId, jobId, success }, 'Failed to send decomposition completion to stdio client'); } } } /** * Handle decomposition failed */ private async handleDecompositionFailed(data: Record<string, unknown>): Promise<void> { const { sessionId, agentId, error, originalSessionId, jobId } = data; if (!agentId || typeof agentId !== 'string') return; await this.completeAgentActivity(agentId, false, { sessionId: sessionId as string, decompositionFailed: new Date(), error: error as Record<string, unknown> }); // Bridge to stdio communication for decomposition failed if (originalSessionId && jobId) { try { const { sseNotifier } = await import('../../../services/sse-notifier/index.js'); const { JobStatus } = await import('../../../services/job-manager/index.js'); const errorMessage = error && typeof error === 'object' && 'message' in error ? `Decomposition failed: ${error.message}` : 'Decomposition failed - check logs for details'; sseNotifier.sendProgress( originalSessionId as string, jobId as string, JobStatus.FAILED, errorMessage, 0 ); logger.debug({ originalSessionId, jobId, error }, 'Progress bridge: forwarded decomposition failure to stdio client'); } catch (bridgeError) { logger.warn({ err: bridgeError, originalSessionId, jobId, error }, 'Failed to send decomposition failure to stdio client'); } } } /** * Handle epic generation started */ private async handleEpicGenerationStarted(data: Record<string, unknown>): Promise<void> { const { agentId, originalSessionId, jobId, metadata } = data; if (!agentId || typeof agentId !== 'string') return; // Bridge to stdio communication for epic generation started if (originalSessionId && jobId) { try { const { sseNotifier } = await import('../../../services/sse-notifier/index.js'); const { JobStatus } = await import('../../../services/job-manager/index.js'); const taskCount = metadata && typeof metadata === 'object' && 'taskCount' in metadata ? metadata.taskCount : 'multiple'; sseNotifier.sendProgress( originalSessionId as string, jobId as string, JobStatus.RUNNING, `Epic generation started - organizing ${taskCount} tasks`, 82 ); logger.debug({ originalSessionId, jobId, taskCount }, 'Progress bridge: forwarded epic generation started to stdio client'); } catch (error) { logger.warn({ err: error, originalSessionId, jobId }, 'Failed to send epic generation started to stdio client'); } } } /** * Handle epic generation completed */ private async handleEpicGenerationCompleted(data: Record<string, unknown>): Promise<void> { const { agentId, status, originalSessionId, jobId, metadata } = data; if (!agentId || typeof agentId !== 'string') return; const success = status === 'completed'; // Bridge to stdio communication for epic generation completed if (originalSessionId && jobId) { try { const { sseNotifier } = await import('../../../services/sse-notifier/index.js'); const { JobStatus } = await import('../../../services/job-manager/index.js'); const taskCount = metadata && typeof metadata === 'object' && 'taskCount' in metadata ? metadata.taskCount : 'multiple'; const completionMessage = success ? `Epic generation completed - ${taskCount} tasks organized into epics` : 'Epic generation completed with issues - tasks may use default epic'; sseNotifier.sendProgress( originalSessionId as string, jobId as string, JobStatus.RUNNING, completionMessage, 85 ); logger.debug({ originalSessionId, jobId, success, taskCount, completionMessage }, 'Progress bridge: forwarded epic generation completion to stdio client'); } catch (error) { logger.warn({ err: error, originalSessionId, jobId, success }, 'Failed to send epic generation completion to stdio client'); } } } /** * Handle task list started */ private async handleTaskListStarted(data: Record<string, unknown>): Promise<void> { const { agentId, originalSessionId, jobId } = data; if (!agentId || typeof agentId !== 'string') return; // Bridge to stdio communication for task list processing started if (originalSessionId && jobId) { try { const { sseNotifier } = await import('../../../services/sse-notifier/index.js'); const { JobStatus } = await import('../../../services/job-manager/index.js'); sseNotifier.sendProgress( originalSessionId as string, jobId as string, JobStatus.RUNNING, 'Task list processing started - persisting tasks to storage', 86 ); logger.debug({ originalSessionId, jobId }, 'Progress bridge: forwarded task list started to stdio client'); } catch (error) { logger.warn({ err: error, originalSessionId, jobId }, 'Failed to send task list started to stdio client'); } } } /** * Handle task list completed */ private async handleTaskListCompleted(data: Record<string, unknown>): Promise<void> { const { agentId, originalSessionId, jobId, totalTasks, persistedTasks } = data; if (!agentId || typeof agentId !== 'string') return; // Bridge to stdio communication for task list processing completed if (originalSessionId && jobId) { try { const { sseNotifier } = await import('../../../services/sse-notifier/index.js'); const { JobStatus } = await import('../../../services/job-manager/index.js'); const taskCount = persistedTasks || totalTasks || 0; const completionMessage = `Task list processing completed - ${taskCount} tasks persisted to storage`; sseNotifier.sendProgress( originalSessionId as string, jobId as string, JobStatus.RUNNING, completionMessage, 90 ); logger.debug({ originalSessionId, jobId, taskCount, completionMessage }, 'Progress bridge: forwarded task list completion to stdio client'); } catch (error) { logger.warn({ err: error, originalSessionId, jobId, taskCount: persistedTasks || totalTasks || 0 }, 'Failed to send task list completion to stdio client'); } } } /** * Dispose of the manager */ dispose(): void { this.stopMonitoring(); this.removeAllListeners(); this.agentStates.clear(); logger.info('Workflow-aware agent manager disposed'); } }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/freshtechbro/vibe-coder-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server