Skip to main content
Glama
execution-watchdog.ts19.3 kB
/** * Execution Watchdog System * * Implements timeout monitoring and recovery for agent tasks. * Provides configurable timeout thresholds, automatic detection, * escalation procedures, and health checks. */ import { AgentOrchestrator } from './agent-orchestrator.js'; import { TaskStreamer } from './task-streamer.js'; import { FeedbackProcessor } from './feedback-processor.js'; import { AtomicTask } from '../types/task.js'; import { AppError } from '../../../utils/errors.js'; import logger from '../../../logger.js'; /** * Watchdog configuration per task type */ export interface WatchdogConfig { taskType: string; timeoutMinutes: number; warningThresholdMinutes: number; maxRetries: number; escalationDelayMinutes: number; healthCheckIntervalMinutes: number; } /** * Task monitoring information */ export interface TaskMonitor { taskId: string; agentId: string; startTime: Date; lastHeartbeat: Date; timeoutAt: Date; warningAt: Date; status: 'monitoring' | 'warning' | 'timeout' | 'escalated' | 'recovered'; retryCount: number; escalationLevel: number; taskType: string; estimatedDuration?: number; } /** * Agent health status */ export interface AgentHealth { agentId: string; lastSeen: Date; consecutiveTimeouts: number; totalTasksAssigned: number; totalTasksCompleted: number; averageCompletionTime: number; healthScore: number; status: 'healthy' | 'degraded' | 'unhealthy' | 'offline'; lastHealthCheck: Date; } /** * Watchdog statistics */ export interface WatchdogStats { totalTasksMonitored: number; activeMonitors: number; timeoutsDetected: number; escalationsTriggered: number; recoveredTasks: number; averageTaskDuration: number; agentsMonitored: number; unhealthyAgents: number; lastStatsUpdate: Date; } /** * Escalation action types */ export type EscalationAction = | 'reassign_task' | 'restart_agent' | 'human_intervention' | 'task_cancellation' | 'system_alert'; /** * Escalation procedure */ export interface EscalationProcedure { level: number; action: EscalationAction; delayMinutes: number; description: string; autoExecute: boolean; } /** * Execution Watchdog System */ export class ExecutionWatchdog { private static instance: ExecutionWatchdog | null = null; private configs = new Map<string, WatchdogConfig>(); private monitors = new Map<string, TaskMonitor>(); private agentHealth = new Map<string, AgentHealth>(); private agentOrchestrator: AgentOrchestrator; private taskStreamer: TaskStreamer; private feedbackProcessor: FeedbackProcessor; private watchdogTimer?: NodeJS.Timeout; private healthCheckTimer?: NodeJS.Timeout; private stats: WatchdogStats; private escalationProcedures: EscalationProcedure[]; private constructor() { this.agentOrchestrator = AgentOrchestrator.getInstance(); this.taskStreamer = TaskStreamer.getInstance(); this.feedbackProcessor = FeedbackProcessor.getInstance(); this.stats = { totalTasksMonitored: 0, activeMonitors: 0, timeoutsDetected: 0, escalationsTriggered: 0, recoveredTasks: 0, averageTaskDuration: 0, agentsMonitored: 0, unhealthyAgents: 0, lastStatsUpdate: new Date() }; // Default escalation procedures this.escalationProcedures = [ { level: 1, action: 'reassign_task', delayMinutes: 5, description: 'Reassign task to another agent', autoExecute: true }, { level: 2, action: 'restart_agent', delayMinutes: 10, description: 'Request agent restart', autoExecute: false }, { level: 3, action: 'human_intervention', delayMinutes: 15, description: 'Escalate to human operator', autoExecute: true } ]; this.initializeDefaultConfigs(); this.startWatchdog(); logger.info('Execution watchdog initialized'); } /** * Get singleton instance */ static getInstance(): ExecutionWatchdog { if (!ExecutionWatchdog.instance) { ExecutionWatchdog.instance = new ExecutionWatchdog(); } return ExecutionWatchdog.instance; } /** * Start monitoring a task */ async startMonitoring( taskId: string, agentId: string, task: AtomicTask ): Promise<void> { try { const config = this.getConfigForTaskType(task.type); const now = new Date(); const monitor: TaskMonitor = { taskId, agentId, startTime: now, lastHeartbeat: now, timeoutAt: new Date(now.getTime() + config.timeoutMinutes * 60000), warningAt: new Date(now.getTime() + config.warningThresholdMinutes * 60000), status: 'monitoring', retryCount: 0, escalationLevel: 0, taskType: task.type, estimatedDuration: task.estimatedHours ? task.estimatedHours * 60 : undefined }; this.monitors.set(taskId, monitor); this.stats.totalTasksMonitored++; this.stats.activeMonitors = this.monitors.size; // Initialize agent health if not exists if (!this.agentHealth.has(agentId)) { await this.initializeAgentHealth(agentId); } // Update agent health const agentHealthInfo = this.agentHealth.get(agentId)!; agentHealthInfo.totalTasksAssigned++; agentHealthInfo.lastSeen = now; logger.info({ taskId, agentId, timeoutAt: monitor.timeoutAt, taskType: task.type }, 'Task monitoring started'); } catch (error) { logger.error({ err: error, taskId, agentId }, 'Failed to start task monitoring'); throw new AppError('Task monitoring startup failed', { cause: error }); } } /** * Stop monitoring a task */ async stopMonitoring(taskId: string, completed: boolean = true): Promise<void> { try { const monitor = this.monitors.get(taskId); if (!monitor) { logger.warn({ taskId }, 'Attempted to stop monitoring non-existent task'); return; } const duration = Date.now() - monitor.startTime.getTime(); // Update agent health const agentHealthInfo = this.agentHealth.get(monitor.agentId); if (agentHealthInfo) { if (completed) { agentHealthInfo.totalTasksCompleted++; agentHealthInfo.averageCompletionTime = (agentHealthInfo.averageCompletionTime + duration) / 2; if (monitor.status === 'timeout' || monitor.status === 'escalated') { this.stats.recoveredTasks++; monitor.status = 'recovered'; } } else { agentHealthInfo.consecutiveTimeouts++; } agentHealthInfo.healthScore = this.calculateHealthScore(agentHealthInfo); agentHealthInfo.status = this.determineHealthStatus(agentHealthInfo); } // Update statistics this.stats.averageTaskDuration = (this.stats.averageTaskDuration + duration) / 2; this.monitors.delete(taskId); this.stats.activeMonitors = this.monitors.size; logger.info({ taskId, agentId: monitor.agentId, duration: Math.round(duration / 1000), completed }, 'Task monitoring stopped'); } catch (error) { logger.error({ err: error, taskId }, 'Failed to stop task monitoring'); throw new AppError('Task monitoring shutdown failed', { cause: error }); } } /** * Update task heartbeat */ updateTaskHeartbeat(taskId: string): void { const monitor = this.monitors.get(taskId); if (monitor) { monitor.lastHeartbeat = new Date(); // Reset warning status if task is active again if (monitor.status === 'warning') { monitor.status = 'monitoring'; } // Update agent health const agentHealthInfo = this.agentHealth.get(monitor.agentId); if (agentHealthInfo) { agentHealthInfo.lastSeen = new Date(); agentHealthInfo.consecutiveTimeouts = 0; // Reset timeout counter } logger.debug({ taskId, agentId: monitor.agentId }, 'Task heartbeat updated'); } } /** * Configure timeout for task type */ configureTaskType(config: WatchdogConfig): void { this.configs.set(config.taskType, config); logger.info({ taskType: config.taskType, config }, 'Watchdog configuration updated'); } /** * Get watchdog statistics */ getStats(): WatchdogStats { this.stats.lastStatsUpdate = new Date(); this.stats.agentsMonitored = this.agentHealth.size; this.stats.unhealthyAgents = Array.from(this.agentHealth.values()) .filter(health => health.status === 'unhealthy' || health.status === 'offline').length; return { ...this.stats }; } /** * Get agent health information */ getAgentHealth(agentId?: string): AgentHealth[] { if (agentId) { const health = this.agentHealth.get(agentId); return health ? [health] : []; } return Array.from(this.agentHealth.values()); } /** * Get active task monitors */ getActiveMonitors(): TaskMonitor[] { return Array.from(this.monitors.values()); } /** * Force timeout check (for testing) */ async forceTimeoutCheck(): Promise<void> { await this.checkTimeouts(); } /** * Initialize default configurations */ private initializeDefaultConfigs(): void { const defaultConfigs: WatchdogConfig[] = [ { taskType: 'frontend', timeoutMinutes: 45, warningThresholdMinutes: 30, maxRetries: 2, escalationDelayMinutes: 5, healthCheckIntervalMinutes: 10 }, { taskType: 'backend', timeoutMinutes: 60, warningThresholdMinutes: 40, maxRetries: 2, escalationDelayMinutes: 5, healthCheckIntervalMinutes: 10 }, { taskType: 'database', timeoutMinutes: 30, warningThresholdMinutes: 20, maxRetries: 3, escalationDelayMinutes: 3, healthCheckIntervalMinutes: 5 }, { taskType: 'testing', timeoutMinutes: 20, warningThresholdMinutes: 15, maxRetries: 1, escalationDelayMinutes: 2, healthCheckIntervalMinutes: 5 }, { taskType: 'documentation', timeoutMinutes: 25, warningThresholdMinutes: 18, maxRetries: 1, escalationDelayMinutes: 3, healthCheckIntervalMinutes: 10 }, { taskType: 'general', timeoutMinutes: 30, warningThresholdMinutes: 20, maxRetries: 2, escalationDelayMinutes: 5, healthCheckIntervalMinutes: 10 } ]; defaultConfigs.forEach(config => { this.configs.set(config.taskType, config); }); logger.debug({ configCount: defaultConfigs.length }, 'Default watchdog configurations initialized'); } /** * Get configuration for task type */ private getConfigForTaskType(taskType: string): WatchdogConfig { return this.configs.get(taskType) || this.configs.get('general')!; } /** * Start watchdog monitoring */ private startWatchdog(): void { // Main watchdog timer - check every 30 seconds this.watchdogTimer = setInterval(() => { this.checkTimeouts().catch(error => { logger.error({ err: error }, 'Error in watchdog timeout check'); }); }, 30000); // Health check timer - check every 2 minutes this.healthCheckTimer = setInterval(() => { this.performHealthChecks().catch(error => { logger.error({ err: error }, 'Error in agent health check'); }); }, 120000); logger.debug('Watchdog timers started'); } /** * Check for task timeouts */ private async checkTimeouts(): Promise<void> { const now = new Date(); for (const monitor of this.monitors.values()) { try { // Check for warnings if (monitor.status === 'monitoring' && now >= monitor.warningAt) { monitor.status = 'warning'; logger.warn({ taskId: monitor.taskId, agentId: monitor.agentId, timeoutIn: Math.round((monitor.timeoutAt.getTime() - now.getTime()) / 60000) }, 'Task approaching timeout'); } // Check for timeouts if (monitor.status !== 'timeout' && now >= monitor.timeoutAt) { monitor.status = 'timeout'; this.stats.timeoutsDetected++; logger.error({ taskId: monitor.taskId, agentId: monitor.agentId, duration: Math.round((now.getTime() - monitor.startTime.getTime()) / 60000) }, 'Task timeout detected'); await this.handleTimeout(monitor); } } catch (error) { logger.error({ err: error, taskId: monitor.taskId }, 'Error checking task timeout'); } } } /** * Handle task timeout */ private async handleTimeout(monitor: TaskMonitor): Promise<void> { try { const config = this.getConfigForTaskType(monitor.taskType); // Update agent health const agentHealthInfo = this.agentHealth.get(monitor.agentId); if (agentHealthInfo) { agentHealthInfo.consecutiveTimeouts++; agentHealthInfo.healthScore = this.calculateHealthScore(agentHealthInfo); agentHealthInfo.status = this.determineHealthStatus(agentHealthInfo); } // Check if we should retry if (monitor.retryCount < config.maxRetries) { monitor.retryCount++; await this.retryTask(monitor); } else { // Escalate await this.escalateTask(monitor); } } catch (error) { logger.error({ err: error, taskId: monitor.taskId }, 'Failed to handle task timeout'); } } /** * Retry a timed-out task */ private async retryTask(monitor: TaskMonitor): Promise<void> { try { // Release current task claim await this.taskStreamer.releaseTask(monitor.taskId, monitor.agentId); // Reset monitor timing const config = this.getConfigForTaskType(monitor.taskType); const now = new Date(); monitor.startTime = now; monitor.lastHeartbeat = now; monitor.timeoutAt = new Date(now.getTime() + config.timeoutMinutes * 60000); monitor.warningAt = new Date(now.getTime() + config.warningThresholdMinutes * 60000); monitor.status = 'monitoring'; logger.info({ taskId: monitor.taskId, agentId: monitor.agentId, retryCount: monitor.retryCount }, 'Task retry initiated'); } catch (error) { logger.error({ err: error, taskId: monitor.taskId }, 'Failed to retry task'); } } /** * Escalate a task */ private async escalateTask(monitor: TaskMonitor): Promise<void> { try { monitor.status = 'escalated'; monitor.escalationLevel++; this.stats.escalationsTriggered++; const procedure = this.escalationProcedures[monitor.escalationLevel - 1]; if (procedure) { logger.warn({ taskId: monitor.taskId, agentId: monitor.agentId, escalationLevel: monitor.escalationLevel, action: procedure.action }, 'Task escalation triggered'); if (procedure.autoExecute) { await this.executeEscalationAction(monitor, procedure); } } else { logger.error({ taskId: monitor.taskId, escalationLevel: monitor.escalationLevel }, 'No escalation procedure defined for level'); } } catch (error) { logger.error({ err: error, taskId: monitor.taskId }, 'Failed to escalate task'); } } /** * Execute escalation action */ private async executeEscalationAction( monitor: TaskMonitor, procedure: EscalationProcedure ): Promise<void> { try { switch (procedure.action) { case 'reassign_task': await this.taskStreamer.releaseTask(monitor.taskId, monitor.agentId); logger.info({ taskId: monitor.taskId }, 'Task reassigned due to escalation'); break; case 'human_intervention': logger.error({ taskId: monitor.taskId, agentId: monitor.agentId }, 'HUMAN INTERVENTION REQUIRED: Task escalated to maximum level'); break; case 'task_cancellation': await this.stopMonitoring(monitor.taskId, false); logger.warn({ taskId: monitor.taskId }, 'Task cancelled due to escalation'); break; default: logger.warn({ action: procedure.action }, 'Escalation action not implemented'); } } catch (error) { logger.error({ err: error, action: procedure.action }, 'Failed to execute escalation action'); } } /** * Initialize agent health tracking */ private async initializeAgentHealth(agentId: string): Promise<void> { const health: AgentHealth = { agentId, lastSeen: new Date(), consecutiveTimeouts: 0, totalTasksAssigned: 0, totalTasksCompleted: 0, averageCompletionTime: 0, healthScore: 1.0, status: 'healthy', lastHealthCheck: new Date() }; this.agentHealth.set(agentId, health); logger.debug({ agentId }, 'Agent health tracking initialized'); } /** * Perform agent health checks */ private async performHealthChecks(): Promise<void> { const now = new Date(); for (const health of this.agentHealth.values()) { try { // Check if agent has been seen recently const timeSinceLastSeen = now.getTime() - health.lastSeen.getTime(); const maxIdleTime = 10 * 60 * 1000; // 10 minutes if (timeSinceLastSeen > maxIdleTime) { health.status = 'offline'; } else { health.healthScore = this.calculateHealthScore(health); health.status = this.determineHealthStatus(health); } health.lastHealthCheck = now; } catch (error) { logger.error({ err: error, agentId: health.agentId }, 'Error in agent health check'); } } } /** * Calculate agent health score */ private calculateHealthScore(health: AgentHealth): number { const completionRate = health.totalTasksAssigned > 0 ? health.totalTasksCompleted / health.totalTasksAssigned : 1.0; const timeoutPenalty = Math.min(health.consecutiveTimeouts * 0.2, 0.8); const baseScore = completionRate - timeoutPenalty; return Math.max(0, Math.min(1, baseScore)); } /** * Determine health status from score */ private determineHealthStatus(health: AgentHealth): AgentHealth['status'] { if (health.healthScore >= 0.8) return 'healthy'; if (health.healthScore >= 0.5) return 'degraded'; return 'unhealthy'; } /** * Cleanup resources */ destroy(): void { if (this.watchdogTimer) { clearInterval(this.watchdogTimer); } if (this.healthCheckTimer) { clearInterval(this.healthCheckTimer); } this.monitors.clear(); this.agentHealth.clear(); this.configs.clear(); ExecutionWatchdog.instance = null; logger.info('Execution watchdog destroyed'); } }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/freshtechbro/vibe-coder-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server