agent-orchestrator.ts•98.9 kB
/**
* Agent Orchestrator Service
*
* Manages agent communication, coordination, and task assignment.
* Handles multi-agent scenarios with load balancing and conflict resolution.
*
* TODO: This service will be consolidated in Phase 2 - temporarily disabling
* type checking to focus on DI container foundation.
*/
// eslint-disable-next-line @typescript-eslint/ban-ts-comment
// @ts-nocheck
import { AtomicTask, TaskPriority } from '../types/task.js';
import { ProjectContext } from '../types/project-context.js';
import { SentinelProtocol, AgentResponse } from '../cli/sentinel-protocol.js';
import type { WebSocketServerManager } from '../../../services/websocket-server/index.js';
import type { HTTPAgentAPIServer } from '../../../services/http-agent-api/index.js';
type SSENotifier = typeof import('../../../services/sse-notifier/index.js').sseNotifier;
import type { TaskAssignment as AgentTasksAssignment } from '../../agent-tasks/index.js';
import {
EnhancedError,
AgentError,
TaskExecutionError,
ValidationError,
ResourceError,
createErrorContext
} from '../utils/enhanced-errors.js';
import { AppError } from '../../../utils/errors.js';
import { MemoryManager } from '../../code-map-generator/cache/memoryManager.js';
import { transportManager } from '../../../services/transport-manager/index.js';
import { getTimeoutManager, TaskComplexity } from '../utils/timeout-manager.js';
import { AgentIntegrationBridge } from './agent-integration-bridge.js';
import { WorkflowAwareAgentManager } from './workflow-aware-agent-manager.js';
import { OperationCircuitBreaker } from '../../../utils/operation-circuit-breaker.js';
import { InitializationMonitor } from '../../../utils/initialization-monitor.js';
import logger from '../../../logger.js';
/**
* Agent capability types
*/
export type AgentCapability =
| 'frontend' | 'backend' | 'database' | 'testing' | 'devops'
| 'documentation' | 'refactoring' | 'debugging' | 'general';
/**
* Task queue interface for agent orchestrator
* Uses any for compatibility with different TaskAssignment interfaces
*/
interface TaskQueueInterface {
addTask: (agentId: string, taskAssignment: any) => Promise<string>; // eslint-disable-line @typescript-eslint/no-explicit-any
}
/**
* Agent registration information
*/
export interface AgentInfo {
id: string;
name: string;
capabilities: AgentCapability[];
maxConcurrentTasks: number;
currentTasks: string[];
status: 'available' | 'busy' | 'offline' | 'error';
lastHeartbeat: Date;
performance: {
tasksCompleted: number;
averageCompletionTime: number;
successRate: number;
lastTaskCompletedAt?: Date;
};
metadata: {
version: string;
supportedProtocols: string[];
preferences: Record<string, unknown>;
};
}
/**
* Unified task assignment information
* Consolidates all task assignment data across different systems
*/
export interface TaskAssignment {
/** Assignment ID */
id?: string;
/** Task ID being assigned */
taskId: string;
/** Full task object for comprehensive access */
task: AtomicTask;
/** Agent ID receiving the assignment */
agentId: string;
/** Assignment timestamp */
assignedAt: Date;
/** Expected completion time */
expectedCompletionAt: Date;
/** Assignment status */
status: 'assigned' | 'in_progress' | 'completed' | 'failed' | 'timeout';
/** Number of assignment attempts */
attempts: number;
/** Last status update timestamp */
lastStatusUpdate: Date;
/** Assignment priority */
priority: 'low' | 'normal' | 'high' | 'urgent';
/** Estimated duration in milliseconds */
estimatedDuration?: number;
/** Assignment deadline */
deadline?: Date;
/** Sentinel protocol payload for agent communication */
sentinelPayload?: string;
/** Assignment context */
context?: {
projectId: string;
epicId?: string;
dependencies: string[];
resources?: string[];
constraints?: string[];
};
/** Assignment metadata */
metadata?: {
assignedBy?: string;
assignedAt?: number;
executionId?: string;
retryCount?: number;
maxRetries?: number;
[key: string]: unknown;
};
}
/**
* Task execution options
*/
export interface ExecutionOptions {
/** Force execution even if agent is busy */
force?: boolean;
/** Task priority override */
priority?: 'low' | 'medium' | 'high' | 'critical';
/** Session ID for tracking */
sessionId?: string;
/** Execution timeout in milliseconds */
timeout?: number;
/** Maximum retry attempts */
maxRetries?: number;
/** Enable real-time progress monitoring */
enableMonitoring?: boolean;
}
/**
* Task execution result
*/
export interface TaskExecutionResult {
/** Whether execution was successful */
success: boolean;
/** Task assignment information */
assignment?: TaskAssignment;
/** Execution status */
status: 'completed' | 'failed' | 'timeout' | 'queued' | 'in_progress';
/** Result message */
message: string;
/** Execution start time */
startTime?: Date;
/** Execution end time */
endTime?: Date;
/** Agent response details */
agentResponse?: AgentResponse;
/** Error information if failed */
error?: string;
/** Whether task was queued for later execution */
queued?: boolean;
/** Execution metadata */
metadata?: {
executionId: string;
attempts: number;
totalDuration?: number;
agentId?: string;
};
}
/**
* Agent communication channel interface
*/
export interface AgentCommunicationChannel {
/** Send task to agent */
sendTask(agentId: string, taskPayload: string): Promise<boolean>;
/** Receive response from agent */
receiveResponse(agentId: string, timeout?: number): Promise<string>;
/** Check if agent is reachable */
isAgentReachable(agentId: string): Promise<boolean>;
/** Close communication channel */
close(): Promise<void>;
}
/**
* Agent orchestration configuration
*/
export interface OrchestratorConfig {
heartbeatInterval: number;
taskTimeout: number;
maxRetries: number;
loadBalancingStrategy: 'round_robin' | 'capability_based' | 'performance_based';
enableHealthChecks: boolean;
conflictResolutionStrategy: 'queue' | 'reassign' | 'parallel';
heartbeatTimeoutMultiplier: number; // Multiplier for heartbeat timeout (default: 3)
enableAdaptiveTimeouts: boolean; // Enable complexity-based timeout adjustment
maxHeartbeatMisses: number; // Maximum missed heartbeats before marking offline
}
/**
* Universal Agent Communication Channel
* Supports stdio, SSE, WebSocket, and HTTP transports
* Provides unified communication across all transport types
*/
class UniversalAgentCommunicationChannel implements AgentCommunicationChannel {
private agentRegistry: unknown | null = null;
private taskQueue: TaskQueueInterface | null = null;
private responseProcessor: unknown | null = null;
private websocketServer: WebSocketServerManager | null = null;
private httpAgentAPI: HTTPAgentAPIServer | null = null;
private sseNotifier: SSENotifier | null = null;
private isInitialized: boolean = false;
private dependenciesPromise: Promise<void> | null = null;
constructor() {
// Defer async initialization to prevent recursion during constructor
this.scheduleAsyncInitialization();
}
/**
* Schedule async initialization to prevent recursion during constructor
*/
private scheduleAsyncInitialization(): void {
process.nextTick(() => {
this.dependenciesPromise = this.initializeDependencies().catch(error => {
logger.error({ err: error }, 'Failed to initialize UniversalAgentCommunicationChannel dependencies');
});
});
}
/**
* Ensure dependencies are ready before any operation
*/
private async ensureDependencies(): Promise<void> {
if (this.dependenciesPromise) {
await this.dependenciesPromise;
}
}
private async initializeDependencies(): Promise<void> {
try {
// Import transport services
const { websocketServer } = await import('../../../services/websocket-server/index.js');
const { httpAgentAPI } = await import('../../../services/http-agent-api/index.js');
const { sseNotifier } = await import('../../../services/sse-notifier/index.js');
this.websocketServer = websocketServer;
this.httpAgentAPI = httpAgentAPI;
this.sseNotifier = sseNotifier;
// Ensure transport services are started via transport manager
await this.ensureTransportServicesStarted();
// Log transport endpoint information using dynamic port allocation
this.logTransportEndpoints();
// Import agent modules directly (circular dependencies resolved by DI container)
try {
// Use direct imports instead of ImportCycleBreaker
const AgentRegistryModule = await import('../../agent-registry/index.js');
const AgentTaskQueueModule = await import('../../agent-tasks/index.js');
const AgentResponseProcessorModule = await import('../../agent-response/index.js');
// Extract classes from modules and get instances
const AgentRegistry = AgentRegistryModule?.AgentRegistry?.getInstance();
const AgentTaskQueue = AgentTaskQueueModule?.AgentTaskQueue?.getInstance();
const AgentResponseProcessor = AgentResponseProcessorModule?.AgentResponseProcessor?.getInstance();
if (AgentRegistry && AgentTaskQueue && AgentResponseProcessor) {
this.agentRegistry = AgentRegistry;
this.taskQueue = AgentTaskQueue;
this.responseProcessor = AgentResponseProcessor;
logger.info('Universal agent communication channel initialized with all transports and agent modules');
} else {
logger.warn('Some agent modules could not be imported due to circular dependencies, using fallback implementations');
// Use fallback implementations for missing modules
if (AgentRegistry) {
this.agentRegistry = AgentRegistry;
} else {
this.agentRegistry = this.createFallbackAgentRegistry();
}
if (AgentTaskQueue) {
this.taskQueue = AgentTaskQueue;
} else {
this.taskQueue = this.createFallbackTaskQueue();
}
if (AgentResponseProcessor) {
this.responseProcessor = AgentResponseProcessor;
} else {
this.responseProcessor = this.createFallbackResponseProcessor();
}
logger.info('Universal agent communication channel initialized with mixed agent modules and fallbacks');
}
} catch (agentModuleError) {
logger.warn({ err: agentModuleError }, 'Agent modules not available, using fallback implementations');
// Fallback implementations
this.agentRegistry = this.createFallbackAgentRegistry();
this.taskQueue = this.createFallbackTaskQueue();
this.responseProcessor = this.createFallbackResponseProcessor();
logger.info('Universal agent communication channel initialized with fallback agent modules');
}
this.isInitialized = true;
} catch (error) {
logger.error({ err: error }, 'Failed to initialize universal communication channel');
// Create minimal fallback implementations
this.websocketServer = null;
this.httpAgentAPI = null;
this.sseNotifier = null;
this.agentRegistry = this.createFallbackAgentRegistry();
this.taskQueue = this.createFallbackTaskQueue();
this.responseProcessor = this.createFallbackResponseProcessor();
this.isInitialized = true;
logger.warn('Universal agent communication channel initialized with minimal fallback implementations');
}
}
/**
* Create fallback agent registry
*/
private createFallbackAgentRegistry(): { getAgent: (agentId: string) => Promise<{ id: string; transportType: string; status: string; lastSeen: number; httpEndpoint?: string; metadata?: { preferences?: { sessionId?: string } } } | null>; getInstance: () => unknown } {
return {
getAgent: async (agentId: string) => {
logger.debug({ agentId }, 'Fallback agent registry: getAgent called');
return {
id: agentId,
transportType: 'stdio',
status: 'online',
lastSeen: Date.now(),
httpEndpoint: undefined
};
},
getInstance: () => this.agentRegistry
};
}
/**
* Create fallback task queue
*/
private createFallbackTaskQueue(): TaskQueueInterface {
const fallbackQueue = new Map<string, Array<any>>(); // eslint-disable-line @typescript-eslint/no-explicit-any
return {
addTask: async (agentId: string, taskAssignment: any) => { // eslint-disable-line @typescript-eslint/no-explicit-any
logger.debug({ agentId, taskAssignment }, 'Fallback task queue: addTask called');
if (!fallbackQueue.has(agentId)) {
fallbackQueue.set(agentId, []);
}
fallbackQueue.get(agentId)!.push(taskAssignment);
return `task-${Date.now()}`;
},
};
}
/**
* Create fallback response processor
*/
private createFallbackResponseProcessor(): { getAgentResponses: (agentId: string) => Promise<AgentResponse[]> } {
return {
getAgentResponses: async (agentId: string) => {
logger.debug({ agentId }, 'Fallback response processor: getAgentResponses called');
return [];
}
};
}
async sendTask(agentId: string, taskPayload: string): Promise<boolean> {
try {
// Ensure dependencies are ready before operation
await this.ensureDependencies();
// Verify agent exists and is registered
const agent = await (this.agentRegistry as { getAgent: (agentId: string) => Promise<unknown> })?.getAgent(agentId);
if (!agent) {
logger.error({ agentId }, 'Agent not found - cannot send task');
return false;
}
// Parse task ID from payload for tracking
const taskId = this.extractTaskIdFromPayload(taskPayload);
// Create task assignment
const taskAssignment: Omit<AgentTasksAssignment, 'taskId' | 'assignedAt'> = {
agentId: agentId, // Add required field
sentinelPayload: taskPayload,
priority: 'normal' as const,
estimatedDuration: 1800000, // 30 minutes default
metadata: {
assignedBy: 'agent-orchestrator',
assignedAt: Date.now()
}
};
// Route task based on transport type
let success = false;
switch (agent.transportType) {
case 'stdio':
// Add task to queue for polling
await this.taskQueue?.addTask(agentId, taskAssignment);
success = true;
break;
case 'sse': {
// Add task to queue for polling AND send immediate SSE notification
await this.taskQueue?.addTask(agentId, taskAssignment);
// Send immediate SSE notification if agent has active session
const sessionId = agent.metadata?.preferences?.sessionId;
if (this.sseNotifier && sessionId) {
try {
await this.sseNotifier.sendEvent(sessionId, 'taskAssigned', {
agentId,
taskId,
taskPayload,
priority: taskAssignment.priority,
assignedAt: taskAssignment.metadata?.assignedAt || Date.now(),
deadline: (typeof taskAssignment.metadata?.assignedAt === 'number' ? taskAssignment.metadata.assignedAt : Date.now()) + (24 * 60 * 60 * 1000),
metadata: taskAssignment.metadata
});
logger.info({ agentId, taskId, sessionId }, 'Task sent to agent via SSE notification');
// Also broadcast task assignment update for monitoring
await this.sseNotifier.broadcastEvent('taskAssignmentUpdate', {
agentId,
taskId,
priority: taskAssignment.priority,
assignedAt: taskAssignment.metadata?.assignedAt || Date.now(),
transportType: 'sse'
});
} catch (sseError) {
logger.warn({ err: sseError, agentId, taskId }, 'SSE task notification failed, task still queued for polling');
}
} else {
logger.debug({
agentId,
taskId,
hasSSENotifier: !!this.sseNotifier,
hasSessionId: !!sessionId
}, 'SSE notification not available, task queued for polling only');
}
success = true;
break;
}
case 'websocket':
// Send directly via WebSocket
if (this.websocketServer && this.websocketServer.isAgentConnected(agentId)) {
try {
success = await this.sendTaskViaWebSocket(
agentId,
taskId,
taskPayload,
taskAssignment.priority,
Date.now()
);
if (success) {
logger.info({ agentId, taskId }, 'Task sent to agent via WebSocket');
} else {
logger.warn({ agentId, taskId }, 'WebSocket task delivery returned false, falling back to task queue');
await this.taskQueue?.addTask(agentId, taskAssignment);
success = true;
}
} catch (error) {
logger.warn({ err: error, agentId }, 'WebSocket task delivery failed, falling back to task queue');
// Fallback to task queue for WebSocket failures
await this.taskQueue?.addTask(agentId, taskAssignment);
success = true;
}
} else {
logger.warn({
agentId,
hasWebSocketServer: !!this.websocketServer,
isAgentConnected: this.websocketServer ? this.websocketServer.isAgentConnected(agentId) : false
}, 'WebSocket server not available or agent not connected, falling back to task queue');
// Fallback to task queue if WebSocket not available
await this.taskQueue?.addTask(agentId, taskAssignment);
success = true;
}
break;
case 'http':
// Send to agent's HTTP endpoint
if (this.httpAgentAPI && agent.httpEndpoint) {
try {
success = await this.sendTaskViaHTTP(
agent,
agentId,
taskId,
taskPayload,
taskAssignment.priority
);
if (success) {
logger.info({ agentId, taskId, httpEndpoint: agent.httpEndpoint }, 'Task sent to agent via HTTP');
} else {
logger.warn({ agentId, taskId, httpEndpoint: agent.httpEndpoint }, 'HTTP task delivery returned false, falling back to task queue');
await this.taskQueue?.addTask(agentId, taskAssignment);
success = true;
}
} catch (error) {
logger.warn({ err: error, agentId, httpEndpoint: agent.httpEndpoint }, 'HTTP task delivery failed, falling back to task queue');
// Fallback to task queue for HTTP failures
await this.taskQueue?.addTask(agentId, taskAssignment);
success = true;
}
} else {
logger.warn({
agentId,
hasHttpAPI: !!this.httpAgentAPI,
hasDeliverMethod: !!(this.httpAgentAPI && 'deliverTaskToAgent' in this.httpAgentAPI),
hasEndpoint: !!agent.httpEndpoint,
httpEndpoint: agent.httpEndpoint
}, 'HTTP API not available or agent has no endpoint, falling back to task queue');
// Fallback to task queue if HTTP not available
await this.taskQueue?.addTask(agentId, taskAssignment);
success = true;
}
break;
default:
logger.error({ agentId, transportType: agent.transportType }, 'Unknown transport type');
return false;
}
if (success) {
logger.info({
agentId,
taskId,
transportType: agent.transportType,
payloadLength: taskPayload.length
}, 'Task sent to agent via universal communication channel');
}
return success;
} catch (error) {
logger.error({ err: error, agentId }, 'Failed to send task to agent');
return false;
}
}
async receiveResponse(agentId: string, timeout: number = 30000): Promise<string> {
try {
// Ensure dependencies are ready before operation
await this.ensureDependencies();
const startTime = Date.now();
// Poll for agent responses
while (Date.now() - startTime < timeout) {
const responses = await this.responseProcessor?.getAgentResponses(agentId) || [];
// Find the most recent response
if (responses.length > 0) {
const latestResponse = responses[responses.length - 1];
// Format response in expected format
const formattedResponse = this.formatAgentResponse(latestResponse);
logger.debug({
agentId,
taskId: latestResponse.task_id,
status: latestResponse.status
}, 'Agent response received');
return formattedResponse;
}
// Wait 100ms before checking again
await new Promise(resolve => setTimeout(resolve, 100));
}
throw new Error(`Timeout waiting for response from agent ${agentId}`);
} catch (error) {
logger.error({ err: error, agentId }, 'Failed to receive response from agent');
throw error;
}
}
async isAgentReachable(agentId: string): Promise<boolean> {
try {
// Ensure dependencies are ready before operation
await this.ensureDependencies();
const agent = await this.agentRegistry?.getAgent(agentId);
if (!agent) {
return false;
}
// Transport-specific reachability checks
let isReachable = false;
const now = Date.now();
const lastSeen = agent.lastSeen || 0;
const maxInactivity = 5 * 60 * 1000; // 5 minutes
switch (agent.transportType) {
case 'stdio':
case 'sse':
// Check if agent is online and recently active
isReachable = agent.status === 'online' && (now - lastSeen) < maxInactivity;
break;
case 'websocket':
// Check WebSocket connection status
if (this.websocketServer) {
isReachable = this.websocketServer.isAgentConnected(agentId) &&
agent.status === 'online' &&
(now - lastSeen) < maxInactivity;
}
break;
case 'http': {
// For HTTP agents, check last heartbeat/polling activity and endpoint availability
const hasHttpEndpoint = !!(agent.httpEndpoint && this.httpAgentAPI);
isReachable = agent.status === 'online' &&
(now - lastSeen) < maxInactivity &&
hasHttpEndpoint;
if (!hasHttpEndpoint) {
logger.debug({
agentId,
hasEndpoint: !!agent.httpEndpoint,
hasHttpAPI: !!this.httpAgentAPI
}, 'HTTP agent missing endpoint or API service');
}
break;
}
default:
isReachable = false;
}
logger.debug({
agentId,
transportType: agent.transportType,
status: agent.status,
lastSeen: new Date(lastSeen).toISOString(),
isReachable
}, 'Agent reachability check');
return isReachable;
} catch (error) {
logger.error({ err: error, agentId }, 'Failed to check agent reachability');
return false;
}
}
async close(): Promise<void> {
try {
logger.info('Universal agent communication channel closed');
// No cleanup needed for the universal channel
// Individual components manage their own lifecycle
} catch (error) {
logger.error({ err: error }, 'Error closing universal communication channel');
}
}
/**
* Ensure transport services are started
*/
private async ensureTransportServicesStarted(): Promise<void> {
try {
// Ensure transport services are started using coordinator
try {
const { transportCoordinator } = await import('../../../services/transport-coordinator.js');
await transportCoordinator.ensureTransportsStarted();
logger.debug('Transport services ensured through coordinator');
} catch (error) {
logger.warn('Failed to ensure transport services through coordinator:', error);
// Fallback to direct transport manager if coordinator fails
const transportStatus = transportManager.getStatus();
if (!transportStatus.isStarted && !transportStatus.startupInProgress) {
logger.info('Fallback: Starting transport services directly...');
transportManager.configure({
websocket: { enabled: true, port: 8080, path: '/agent-ws' },
http: { enabled: true, port: 3001, cors: true },
sse: { enabled: true },
stdio: { enabled: true }
});
await transportManager.startAll();
}
}
// Verify WebSocket and HTTP services are available
const allocatedPorts = transportManager.getAllocatedPorts();
if (!allocatedPorts.websocket && this.websocketServer) {
logger.warn('WebSocket service not allocated port, may not be available');
}
if (!allocatedPorts.http && this.httpAgentAPI) {
logger.warn('HTTP service not allocated port, may not be available');
}
} catch (error) {
logger.warn({ err: error }, 'Failed to ensure transport services are started, continuing with fallback');
}
}
/**
* Log transport endpoint information using dynamic port allocation
*/
private logTransportEndpoints(): void {
try {
const allocatedPorts = transportManager.getAllocatedPorts();
const endpoints = transportManager.getServiceEndpoints();
logger.info({
allocatedPorts,
endpoints,
note: 'Agent orchestrator using dynamic port allocation'
}, 'Transport endpoints available for agent communication');
} catch (error) {
logger.warn({ err: error }, 'Failed to get transport endpoint information');
}
}
/**
* Get transport status for agent communication
*/
getTransportStatus(): {
websocket: { available: boolean; port?: number; endpoint?: string };
http: { available: boolean; port?: number; endpoint?: string };
sse: { available: boolean; port?: number; endpoint?: string };
stdio: { available: boolean };
} {
try {
const allocatedPorts = transportManager.getAllocatedPorts();
const endpoints = transportManager.getServiceEndpoints();
return {
websocket: {
available: !!allocatedPorts.websocket,
port: allocatedPorts.websocket,
endpoint: endpoints.websocket
},
http: {
available: !!allocatedPorts.http,
port: allocatedPorts.http,
endpoint: endpoints.http
},
sse: {
available: !!allocatedPorts.sse,
port: allocatedPorts.sse,
endpoint: endpoints.sse
},
stdio: {
available: true // stdio is always available
}
};
} catch (error) {
logger.warn({ err: error }, 'Failed to get transport status');
return {
websocket: { available: false },
http: { available: false },
sse: { available: false },
stdio: { available: true }
};
}
}
private extractTaskIdFromPayload(taskPayload: string): string {
try {
const lines = taskPayload.split('\n');
const jsonStart = lines.findIndex(line => line.includes('{'));
const jsonEnd = lines.findIndex(line => line.includes('### VIBE_TASK_END'));
if (jsonStart === -1 || jsonEnd === -1) {
return 'unknown';
}
const jsonPayload = lines.slice(jsonStart, jsonEnd).join('\n');
const taskData = JSON.parse(jsonPayload);
return taskData.metadata?.task_id || taskData.task?.id || 'unknown';
} catch (error) {
logger.debug({ err: error }, 'Failed to extract task ID from payload');
return 'unknown';
}
}
/**
* Get agent responses through unified processor
*/
async getAgentResponses(agentId: string): Promise<AgentResponse[]> {
try {
// Import AgentResponseProcessor dynamically
const { AgentResponseProcessor } = await import('../../agent-response/index.js');
const responseProcessor = AgentResponseProcessor.getInstance();
// Get responses for all tasks assigned to this agent
const agentResponses: AgentResponse[] = [];
// Note: this.assignments is from the AgentOrchestrator class, not UniversalAgentCommunicationChannel
// We need to access the orchestrator instance to get assignments
const orchestrator = AgentOrchestrator.getInstance();
for (const [taskId, assignment] of orchestrator.getAssignmentsMap().entries()) {
if (assignment.agentId === agentId) {
const response = await responseProcessor.getResponse(taskId);
if (response) {
agentResponses.push(response as unknown as AgentResponse);
}
}
}
return agentResponses;
} catch (error) {
logger.warn({ err: error, agentId }, 'Failed to get agent responses through unified processor');
return [];
}
}
private formatAgentResponse(response: AgentResponse): string {
try {
// Convert agent response to expected Sentinel Protocol format
let formattedResponse = `VIBE_STATUS: ${response.status}\n`;
if (response.message) {
formattedResponse += response.message;
}
if (response.completion_details) {
const details = response.completion_details;
if (details.files_modified && details.files_modified.length > 0) {
formattedResponse += `\nFiles modified: ${details.files_modified.join(', ')}`;
}
if (details.tests_passed !== undefined) {
formattedResponse += `\nTests passed: ${details.tests_passed}`;
}
if (details.build_successful !== undefined) {
formattedResponse += `\nBuild successful: ${details.build_successful}`;
}
if (details.notes) {
formattedResponse += `\nNotes: ${details.notes}`;
}
}
return formattedResponse;
} catch (error) {
logger.error({ err: error }, 'Failed to format agent response');
return `VIBE_STATUS: ERROR\nFailed to format response: ${error instanceof Error ? error.message : 'Unknown error'}`;
}
}
/**
* Adapter method for WebSocket task delivery
* Converts from expected interface to actual WebSocketServerManager interface
*/
private async sendTaskViaWebSocket(
agentId: string,
taskId: string,
sentinelPayload: string,
priority: string,
_assignedAt: number
): Promise<boolean> {
if (!this.websocketServer) {
return false;
}
// Convert to the interface expected by WebSocketServerManager
return this.websocketServer.sendTaskToAgent(agentId, {
taskId,
task: sentinelPayload, // Map sentinelPayload to task
priority: priority === 'urgent' ? 3 : priority === 'high' ? 2 : priority === 'normal' ? 1 : 0
});
}
/**
* Adapter method for HTTP task delivery
* Converts from expected interface to actual HTTPAgentAPIServer interface
*/
private async sendTaskViaHTTP(
agent: { httpEndpoint?: string; [key: string]: unknown },
agentId: string,
taskId: string,
taskPayload: string,
priority: string
): Promise<boolean> {
if (!this.httpAgentAPI || !agent.httpEndpoint) {
return false;
}
// Check if deliverTaskToAgent is available (it might be private)
if (!('deliverTaskToAgent' in this.httpAgentAPI) ||
typeof (this.httpAgentAPI as unknown as { deliverTaskToAgent?: unknown }).deliverTaskToAgent !== 'function') {
logger.warn('HTTPAgentAPIServer.deliverTaskToAgent is not accessible');
return false;
}
try {
// Parse the task payload to create the expected TaskPayload object
const parsedPayload = JSON.parse(taskPayload);
const taskPayloadObj = {
type: parsedPayload.type || 'task',
description: parsedPayload.description || '',
parameters: parsedPayload.parameters || {},
context: parsedPayload.context || {}
};
// Use fetch instead of private method
const headers: Record<string, string> = {
'Content-Type': 'application/json'
};
const httpAuthToken = agent.metadata && typeof agent.metadata === 'object' && 'preferences' in agent.metadata && agent.metadata.preferences
? (agent.metadata.preferences as Record<string, unknown>).httpAuthToken : undefined;
if (httpAuthToken && typeof httpAuthToken === 'string') {
headers['Authorization'] = `Bearer ${httpAuthToken}`;
}
const response = await fetch(agent.httpEndpoint!, {
method: 'POST',
headers,
body: JSON.stringify({
taskId,
taskPayload: taskPayloadObj,
priority: priority as 'low' | 'normal' | 'high',
deadline: Date.now() + 24 * 60 * 60 * 1000,
assignedAt: Date.now()
})
});
return response.ok;
} catch (error) {
logger.error({ err: error, agentId, taskId }, 'Failed to parse task payload for HTTP delivery');
return false;
}
}
}
/**
* Agent Orchestrator Service
*/
export class AgentOrchestrator {
private static instance: AgentOrchestrator | null = null;
private static isInitializing = false; // Initialization guard to prevent circular initialization
private agents = new Map<string, AgentInfo>();
private assignments = new Map<string, TaskAssignment>();
private taskQueue: string[] = [];
private sentinelProtocol: SentinelProtocol;
private memoryManager: MemoryManager;
private config: OrchestratorConfig;
private heartbeatTimer?: NodeJS.Timeout;
private agentHeartbeatMisses = new Map<string, number>(); // Track missed heartbeats per agent
private integrationBridge: AgentIntegrationBridge;
private workflowAwareManager: WorkflowAwareAgentManager;
private isBridgeRegistration = false; // Flag to prevent circular registration
// New execution tracking and communication
private activeExecutions = new Map<string, TaskExecutionResult>();
private communicationChannel: AgentCommunicationChannel;
private executionMonitors = new Map<string, NodeJS.Timeout>();
private sseNotifier: SSENotifier | null = null;
// Task completion callbacks
private taskCompletionCallbacks = new Map<string, (taskId: string, success: boolean, details?: Record<string, unknown>) => Promise<void>>();
private constructor(config?: Partial<OrchestratorConfig>) {
// Get timeout manager for better defaults
const timeoutManager = getTimeoutManager();
this.config = {
heartbeatInterval: 30000, // 30 seconds
taskTimeout: timeoutManager.getTimeout('taskExecution'), // Use configurable timeout
maxRetries: timeoutManager.getRetryConfig().maxRetries, // Use configurable retries
loadBalancingStrategy: 'capability_based',
enableHealthChecks: true,
conflictResolutionStrategy: 'queue',
heartbeatTimeoutMultiplier: 3, // 3 missed heartbeats = offline
enableAdaptiveTimeouts: true, // Enable complexity-based timeouts
maxHeartbeatMisses: 5, // Allow up to 5 missed heartbeats with exponential backoff
...config
};
this.sentinelProtocol = new SentinelProtocol({
timeout_minutes: this.config.taskTimeout / 60000
});
this.memoryManager = new MemoryManager();
this.communicationChannel = new UniversalAgentCommunicationChannel();
this.integrationBridge = AgentIntegrationBridge.getInstance();
this.workflowAwareManager = WorkflowAwareAgentManager.getInstance({
baseHeartbeatInterval: this.config.heartbeatInterval,
enableAdaptiveTimeouts: this.config.enableAdaptiveTimeouts,
maxGracePeriods: this.config.maxHeartbeatMisses
});
// Initialize SSE notifier asynchronously
this.initializeSSENotifier().catch(error => {
logger.warn({ err: error }, 'Failed to initialize SSE notifier');
});
this.startHeartbeatMonitoring();
// Start workflow-aware agent monitoring
this.workflowAwareManager.startMonitoring().catch(error => {
logger.warn({ err: error }, 'Failed to start workflow-aware agent monitoring');
});
// Start agent synchronization
this.integrationBridge.startAutoSync(60000); // Sync every minute
// Register scheduler callback for task completion notifications
this.registerSchedulerCallback().catch(error => {
logger.warn({ err: error }, 'Failed to register scheduler callback during initialization');
});
logger.info({ config: this.config }, 'Agent orchestrator initialized with integration bridge');
}
/**
* Initialize SSE notifier
*/
private async initializeSSENotifier(): Promise<void> {
try {
const { sseNotifier } = await import('../../../services/sse-notifier/index.js');
this.sseNotifier = sseNotifier;
logger.debug('SSE notifier initialized for agent orchestrator');
} catch (error) {
logger.warn({ err: error }, 'Failed to initialize SSE notifier');
this.sseNotifier = null;
}
}
/**
* Get singleton instance
*/
static getInstance(config?: Partial<OrchestratorConfig>): AgentOrchestrator {
if (AgentOrchestrator.isInitializing) {
logger.warn('Circular initialization detected in AgentOrchestrator, using safe fallback');
return AgentOrchestrator.createSafeFallback();
}
if (!AgentOrchestrator.instance) {
const monitor = InitializationMonitor.getInstance();
monitor.startServiceInitialization('AgentOrchestrator', [
'TransportManager',
'MemoryManager',
'AgentIntegrationBridge',
'WorkflowAwareAgentManager'
], { config });
AgentOrchestrator.isInitializing = true;
try {
monitor.startPhase('AgentOrchestrator', 'constructor');
AgentOrchestrator.instance = new AgentOrchestrator(config);
monitor.endPhase('AgentOrchestrator', 'constructor');
monitor.endServiceInitialization('AgentOrchestrator');
} catch (error) {
monitor.endPhase('AgentOrchestrator', 'constructor', error as Error);
monitor.endServiceInitialization('AgentOrchestrator', error as Error);
throw error;
} finally {
AgentOrchestrator.isInitializing = false;
}
}
return AgentOrchestrator.instance;
}
/**
* Create safe fallback instance to prevent recursion
*/
private static createSafeFallback(): AgentOrchestrator {
const fallback = Object.create(AgentOrchestrator.prototype);
// Initialize with minimal safe properties
fallback.agents = new Map();
fallback.assignments = new Map();
fallback.taskQueue = [];
fallback.agentHeartbeatMisses = new Map();
fallback.isBridgeRegistration = false;
// Provide safe no-op methods
fallback.registerAgent = async () => {
logger.warn('AgentOrchestrator fallback: registerAgent called during initialization');
};
fallback.assignTask = async () => {
logger.warn('AgentOrchestrator fallback: assignTask called during initialization');
return null;
};
fallback.getAgents = async () => {
logger.warn('AgentOrchestrator fallback: getAgents called during initialization');
return [];
};
return fallback;
}
/**
* Register a new agent (enhanced with integration bridge)
*/
async registerAgent(agentInfo: Omit<AgentInfo, 'lastHeartbeat' | 'performance'>): Promise<void> {
const result = await OperationCircuitBreaker.safeExecute(
`registerAgent_${agentInfo.id}`,
async () => {
const fullAgentInfo: AgentInfo = {
...agentInfo,
lastHeartbeat: new Date(),
performance: {
tasksCompleted: 0,
averageCompletionTime: 0,
successRate: 1.0
}
};
this.agents.set(agentInfo.id, fullAgentInfo);
// Only trigger integration bridge if this is not already a bridge-initiated registration
if (!this.isBridgeRegistration) {
try {
await this.integrationBridge.registerAgent({
id: agentInfo.id,
name: agentInfo.name,
capabilities: agentInfo.capabilities.map(cap => cap.toString()),
status: agentInfo.status === 'available' ? 'online' as const : agentInfo.status as 'online' | 'offline' | 'busy',
maxConcurrentTasks: agentInfo.maxConcurrentTasks,
currentTasks: agentInfo.currentTasks,
transportType: (agentInfo.metadata.preferences?.transportType as 'stdio' | 'sse' | 'websocket' | 'http') || 'stdio',
sessionId: agentInfo.metadata.preferences?.sessionId as string,
pollingInterval: agentInfo.metadata.preferences?.pollingInterval as number,
registeredAt: Date.now(),
lastSeen: Date.now(),
lastHeartbeat: fullAgentInfo.lastHeartbeat,
performance: fullAgentInfo.performance,
httpEndpoint: agentInfo.metadata.preferences?.httpEndpoint as string,
httpAuthToken: agentInfo.metadata.preferences?.httpAuthToken as string,
metadata: agentInfo.metadata
});
logger.info({
agentId: agentInfo.id,
capabilities: agentInfo.capabilities
}, 'Agent registered in both orchestrator and registry via integration bridge');
} catch (bridgeError) {
logger.warn({ err: bridgeError, agentId: agentInfo.id }, 'Integration bridge registration failed, agent registered in orchestrator only');
}
}
// Trigger memory cleanup if needed
this.memoryManager.getMemoryStats();
return true;
},
() => {
logger.warn({ agentId: agentInfo.id }, 'Agent registration failed due to circuit breaker, using fallback (agent not registered)');
return false;
},
{
failureThreshold: 3,
timeout: 30000,
operationTimeout: 10000
}
);
if (!result.success && result.error) {
throw new AppError('Agent registration failed', { cause: result.error });
}
}
/**
* Unregister an agent
*/
async unregisterAgent(agentId: string): Promise<void> {
try {
const agent = this.agents.get(agentId);
if (!agent) {
const errorContext = createErrorContext('AgentOrchestrator', 'unassignTask')
.agentId(agentId)
.build();
throw new ValidationError(
`Agent not found: ${agentId}`,
errorContext,
{
field: 'agentId',
expectedFormat: 'Valid agent ID',
actualValue: agentId
}
);
}
// Reassign any current tasks
await this.reassignAgentTasks(agentId);
this.agents.delete(agentId);
logger.info({ agentId }, 'Agent unregistered');
} catch (error) {
logger.error({ err: error, agentId }, 'Failed to unregister agent');
throw new AppError('Agent unregistration failed', { cause: error });
}
}
/**
* Update agent heartbeat (enhanced with workflow awareness)
*/
updateAgentHeartbeat(agentId: string, status?: AgentInfo['status']): void {
const agent = this.agents.get(agentId);
if (agent) {
const oldStatus = agent.status;
agent.lastHeartbeat = new Date();
if (status) {
agent.status = status;
}
// Reset missed heartbeat counter on successful heartbeat
this.agentHeartbeatMisses.delete(agentId);
// Update workflow-aware manager with heartbeat
const agentState = this.workflowAwareManager.getAgentState(agentId);
if (agentState) {
// Update progress as heartbeat (maintains current activity)
this.workflowAwareManager.updateAgentProgress(agentId, agentState.progressPercentage, {
heartbeatUpdate: new Date(),
orchestratorStatus: status
}).catch(error => {
logger.warn({ err: error, agentId }, 'Failed to update workflow-aware manager on heartbeat');
});
} else if (status === 'available') {
// Register agent as idle if not already tracked
this.workflowAwareManager.registerAgentActivity(agentId, 'idle', {
metadata: { autoRegisteredOnHeartbeat: true }
}).catch(error => {
logger.warn({ err: error, agentId }, 'Failed to register agent activity on heartbeat');
});
}
// Propagate status change if it changed
if (status && status !== oldStatus) {
this.integrationBridge.propagateStatusChange(agentId, status, 'orchestrator')
.catch(error => {
logger.warn({ err: error, agentId, status }, 'Failed to propagate status change from heartbeat update');
});
}
logger.debug({ agentId, status }, 'Agent heartbeat updated with workflow awareness');
}
}
/**
* Get adaptive timeout for task based on complexity
*/
getAdaptiveTaskTimeout(task: AtomicTask): number {
if (!this.config.enableAdaptiveTimeouts) {
return this.config.taskTimeout;
}
const timeoutManager = getTimeoutManager();
// Determine task complexity based on task properties
const complexity = this.determineTaskComplexity(task);
// Get estimated hours from task
const estimatedHours = task.estimatedHours || 1;
return timeoutManager.getComplexityAdjustedTimeout('taskExecution', complexity, estimatedHours);
}
/**
* Determine task complexity based on task properties
*/
private determineTaskComplexity(task: AtomicTask): TaskComplexity {
const estimatedHours = task.estimatedHours || 1;
const priority = task.priority || 'medium';
const dependencies = task.dependencies?.length || 0;
// Complex scoring algorithm
let complexityScore = 0;
// Time-based scoring
if (estimatedHours <= 1) complexityScore += 1;
else if (estimatedHours <= 4) complexityScore += 2;
else if (estimatedHours <= 8) complexityScore += 3;
else complexityScore += 4;
// Priority-based scoring
if (priority === 'critical') complexityScore += 2;
else if (priority === 'high') complexityScore += 1;
// Dependency-based scoring
if (dependencies > 5) complexityScore += 2;
else if (dependencies > 2) complexityScore += 1;
// Task type scoring (if available)
if (task.type === 'development' || task.type === 'deployment') complexityScore += 2;
else if (task.type === 'testing' || task.type === 'documentation') complexityScore -= 1;
// Map score to complexity
if (complexityScore <= 2) return 'simple';
else if (complexityScore <= 4) return 'moderate';
else if (complexityScore <= 6) return 'complex';
else return 'critical';
}
/**
* Assign task to best available agent
*/
async assignTask(
task: AtomicTask,
context: ProjectContext,
epicTitle?: string
): Promise<TaskAssignment | null> {
const errorContext = createErrorContext('AgentOrchestrator', 'assignTask')
.taskId(task.id)
.metadata({
taskType: task.type,
taskPriority: task.priority,
availableAgents: this.agents.size,
queuedTasks: this.taskQueue.length
})
.build();
try {
// Validate task input
if (!task.id || task.id.trim() === '') {
throw new ValidationError(
'Task ID is required for assignment',
errorContext,
{
field: 'task.id',
expectedFormat: 'Non-empty string',
actualValue: task.id
}
);
}
if (!task.title || task.title.trim() === '') {
throw new ValidationError(
'Task title is required for assignment',
errorContext,
{
field: 'task.title',
expectedFormat: 'Non-empty string',
actualValue: task.title
}
);
}
const availableAgent = this.selectBestAgent(task);
if (!availableAgent) {
// Check if we have any agents at all
if (this.agents.size === 0) {
throw new ResourceError(
'No agents are registered in the system',
errorContext,
{
resourceType: 'agents',
availableAmount: 0,
requiredAmount: 1
}
);
}
// All agents are busy - add to queue
this.taskQueue.push(task.id);
logger.info({ taskId: task.id }, 'Task queued - no available agents');
return null;
}
// Validate agent capabilities match task requirements
if (task.type && !this.isAgentCapableOfTask(availableAgent, task)) {
throw new AgentError(
`Agent ${availableAgent.id} lacks required capabilities for task type: ${task.type}`,
errorContext,
{
agentType: availableAgent.capabilities.join(', '),
agentStatus: availableAgent.status,
capabilities: availableAgent.capabilities
}
);
}
// Create unified assignment
const assignment: TaskAssignment = {
id: `assignment_${task.id}_${Date.now()}`,
taskId: task.id,
task: task,
agentId: availableAgent.id,
assignedAt: new Date(),
expectedCompletionAt: new Date(Date.now() + this.config.taskTimeout),
status: 'assigned',
attempts: 1,
lastStatusUpdate: new Date(),
priority: this.mapTaskPriorityToAssignmentPriority(task.priority),
estimatedDuration: task.estimatedHours * 60 * 60 * 1000, // Convert hours to milliseconds
deadline: new Date(Date.now() + this.config.taskTimeout),
context: {
projectId: task.projectId,
epicId: task.epicId,
dependencies: task.dependencies,
resources: [],
constraints: []
},
metadata: {
assignedBy: 'agent-orchestrator',
assignedAt: Date.now(),
executionId: `exec_${task.id}_${Date.now()}`,
retryCount: 0,
maxRetries: this.config.maxRetries
}
};
// Update agent status
const oldStatus = availableAgent.status;
availableAgent.currentTasks.push(task.id);
if (availableAgent.currentTasks.length >= availableAgent.maxConcurrentTasks) {
availableAgent.status = 'busy';
}
// Propagate status change if it changed
if (availableAgent.status !== oldStatus) {
this.integrationBridge.propagateStatusChange(availableAgent.id, availableAgent.status, 'orchestrator')
.catch(error => {
logger.warn({ err: error, agentId: availableAgent.id, status: availableAgent.status }, 'Failed to propagate status change from task assignment');
});
}
// Propagate task assignment
this.integrationBridge.propagateTaskStatusChange(availableAgent.id, task.id, 'assigned', 'orchestrator')
.catch(error => {
logger.warn({ err: error, agentId: availableAgent.id, taskId: task.id }, 'Failed to propagate task assignment');
});
// Store assignment
this.assignments.set(task.id, assignment);
// Register task execution activity in workflow-aware manager
this.workflowAwareManager.registerAgentActivity(availableAgent.id, 'task_execution', {
workflowId: task.projectId,
sessionId: (context as unknown as Record<string, unknown>).sessionId as string || `session_${Date.now()}`,
expectedDuration: assignment.estimatedDuration,
isWorkflowCritical: false,
metadata: {
taskId: task.id,
taskType: task.type,
priority: task.priority,
assignmentId: assignment.id
}
}).catch(error => {
logger.warn({ err: error, agentId: availableAgent.id, taskId: task.id }, 'Failed to register task execution activity');
});
// Format task for agent
try {
const taskPayload = this.sentinelProtocol.formatTaskForAgent(task, context, epicTitle);
logger.info({
taskId: task.id,
agentId: availableAgent.id,
payload: taskPayload.substring(0, 200) + '...'
}, 'Task assigned to agent with workflow awareness');
} catch (formatError) {
// Rollback assignment if formatting fails
this.assignments.delete(task.id);
availableAgent.currentTasks = availableAgent.currentTasks.filter(id => id !== task.id);
if (availableAgent.currentTasks.length < availableAgent.maxConcurrentTasks) {
availableAgent.status = 'available';
}
throw new TaskExecutionError(
`Failed to format task for agent: ${formatError instanceof Error ? formatError.message : String(formatError)}`,
errorContext,
{
cause: formatError instanceof Error ? formatError : undefined,
agentCapabilities: availableAgent.capabilities,
retryable: true
}
);
}
return assignment;
} catch (error) {
if (error instanceof EnhancedError) {
throw error;
}
throw new AgentError(
`Task assignment failed: ${error instanceof Error ? error.message : String(error)}`,
errorContext,
{
cause: error instanceof Error ? error : undefined
}
);
}
}
/**
* Execute task with complete flow: assignment, delivery, monitoring, and result processing
*/
async executeTask(
task: AtomicTask,
context: ProjectContext,
options: ExecutionOptions = {}
): Promise<TaskExecutionResult> {
const executionId = `exec_${task.id}_${Date.now()}`;
const startTime = new Date();
// Validate task inputs
if (!task.id || task.id.trim() === '') {
return {
success: false,
status: 'failed',
message: 'Invalid task: Task ID is required',
startTime,
endTime: new Date(),
error: 'Invalid task ID',
metadata: {
executionId,
attempts: 0
}
};
}
if (!task.title || task.title.trim() === '') {
return {
success: false,
status: 'failed',
message: 'Invalid task: Task title is required',
startTime,
endTime: new Date(),
error: 'Invalid task title',
metadata: {
executionId,
attempts: 0
}
};
}
// Set default options
const execOptions = {
timeout: this.config.taskTimeout,
maxRetries: this.config.maxRetries,
enableMonitoring: true,
...options
};
logger.info({
taskId: task.id,
executionId,
options: execOptions
}, 'Starting task execution');
try {
// Step 1: Assign task to agent
const assignment = await this.assignTask(task, context);
if (!assignment) {
// No agents available - queue for later execution
const result: TaskExecutionResult = {
success: false,
status: 'queued',
message: 'No available agents. Task queued for execution when agents become available.',
startTime,
queued: true,
metadata: {
executionId,
attempts: 0
}
};
this.activeExecutions.set(executionId, result);
return result;
}
// Step 2: Deliver task to agent
const taskPayload = this.sentinelProtocol.formatTaskForAgent(task, context);
const deliverySuccess = await this.communicationChannel.sendTask(assignment.agentId, taskPayload);
if (!deliverySuccess) {
// Task delivery failed
await this.handleExecutionFailure(assignment, 'Task delivery failed');
return {
success: false,
status: 'failed',
message: 'Failed to deliver task to agent',
startTime,
endTime: new Date(),
assignment,
error: 'Task delivery failed',
metadata: {
executionId,
attempts: assignment.attempts,
agentId: assignment.agentId
}
};
}
// Step 3: Monitor execution and wait for completion
const result = await this.monitorTaskExecution(assignment, execOptions, executionId, startTime);
// Step 4: Store and return result
this.activeExecutions.set(executionId, result);
logger.info({
taskId: task.id,
executionId,
status: result.status,
duration: result.endTime ? result.endTime.getTime() - startTime.getTime() : undefined
}, 'Task execution completed');
return result;
} catch (error) {
logger.error({ err: error, taskId: task.id, executionId }, 'Task execution failed with error');
const result: TaskExecutionResult = {
success: false,
status: 'failed',
message: `Execution failed: ${error instanceof Error ? error.message : 'Unknown error'}`,
startTime,
endTime: new Date(),
error: error instanceof Error ? error.message : String(error),
metadata: {
executionId,
attempts: 1
}
};
this.activeExecutions.set(executionId, result);
return result;
}
}
/**
* Monitor task execution with real-time progress tracking
*/
private async monitorTaskExecution(
assignment: TaskAssignment,
options: ExecutionOptions,
executionId: string,
startTime: Date
): Promise<TaskExecutionResult> {
const timeout = options.timeout || this.config.taskTimeout;
const maxRetries = options.maxRetries || this.config.maxRetries;
return new Promise((resolve) => {
let attempts = 0;
let monitoringHandle: NodeJS.Timeout | undefined;
const cleanup = () => {
if (timeoutHandle) clearTimeout(timeoutHandle);
if (monitoringHandle) clearInterval(monitoringHandle);
this.executionMonitors.delete(executionId);
};
const completeExecution = (result: TaskExecutionResult) => {
cleanup();
resolve(result);
};
// Set up timeout
// eslint-disable-next-line prefer-const
let timeoutHandle = setTimeout(async () => {
logger.warn({ taskId: assignment.taskId, executionId }, 'Task execution timeout');
if (attempts < maxRetries) {
attempts++;
logger.info({ taskId: assignment.taskId, attempt: attempts }, 'Retrying task execution');
// Retry execution
try {
const retryResult = await this.retryTaskExecution(assignment, options, executionId, startTime, attempts);
completeExecution(retryResult);
} catch (error) {
completeExecution({
success: false,
status: 'failed',
message: `Retry failed: ${error instanceof Error ? error.message : 'Unknown error'}`,
startTime,
endTime: new Date(),
assignment,
error: error instanceof Error ? error.message : String(error),
metadata: {
executionId,
attempts,
agentId: assignment.agentId
}
});
}
} else {
completeExecution({
success: false,
status: 'timeout',
message: `Task execution timed out after ${timeout}ms`,
startTime,
endTime: new Date(),
assignment,
error: 'Execution timeout',
metadata: {
executionId,
attempts,
totalDuration: Date.now() - startTime.getTime(),
agentId: assignment.agentId
}
});
}
}, timeout);
// Set up monitoring
if (options.enableMonitoring) {
monitoringHandle = setInterval(async () => {
try {
// Check for agent response
const responseText = await this.communicationChannel.receiveResponse(assignment.agentId, 1000);
if (responseText) {
// Process the response
const agentResponse = this.sentinelProtocol.parseAgentResponse(responseText, assignment.taskId);
// Update assignment status based on response
assignment.lastStatusUpdate = new Date();
switch (agentResponse.status) {
case 'DONE':
assignment.status = 'completed';
completeExecution({
success: true,
status: 'completed',
message: 'Task completed successfully',
startTime,
endTime: new Date(),
assignment,
agentResponse,
metadata: {
executionId,
attempts: attempts + 1,
totalDuration: Date.now() - startTime.getTime(),
agentId: assignment.agentId
}
});
break;
case 'IN_PROGRESS':
assignment.status = 'in_progress';
logger.debug({
taskId: assignment.taskId,
progress: agentResponse.progress_percentage
}, 'Task progress update');
break;
case 'FAILED':
assignment.status = 'failed';
completeExecution({
success: false,
status: 'failed',
message: agentResponse.message || 'Task failed',
startTime,
endTime: new Date(),
assignment,
agentResponse,
error: agentResponse.message,
metadata: {
executionId,
attempts: attempts + 1,
totalDuration: Date.now() - startTime.getTime(),
agentId: assignment.agentId
}
});
break;
case 'HELP':
case 'BLOCKED':
logger.warn({
taskId: assignment.taskId,
status: agentResponse.status,
details: agentResponse.help_request || agentResponse.blocker_details
}, 'Task requires intervention');
completeExecution({
success: false,
status: 'failed',
message: `Task ${agentResponse.status.toLowerCase()}: ${agentResponse.message}`,
startTime,
endTime: new Date(),
assignment,
agentResponse,
error: `Task ${agentResponse.status.toLowerCase()}`,
metadata: {
executionId,
attempts: attempts + 1,
totalDuration: Date.now() - startTime.getTime(),
agentId: assignment.agentId
}
});
break;
}
}
} catch {
// No response yet, continue monitoring
logger.debug({ taskId: assignment.taskId }, 'No agent response yet, continuing to monitor');
}
}, 2000); // Check every 2 seconds
}
// Store monitoring handle for cleanup if monitoring is enabled
if (options.enableMonitoring && monitoringHandle) {
this.executionMonitors.set(executionId, monitoringHandle);
}
});
}
/**
* Unified response processing that integrates with AgentResponseProcessor
*/
async processAgentResponse(responseText: string, agentId: string): Promise<void> {
try {
const response = this.sentinelProtocol.parseAgentResponse(responseText);
const assignment = this.assignments.get(response.task_id);
if (!assignment) {
logger.warn({ taskId: response.task_id, agentId }, 'Received response for unknown task');
return;
}
if (assignment.agentId !== agentId) {
logger.warn({
taskId: response.task_id,
expectedAgent: assignment.agentId,
actualAgent: agentId
}, 'Response from unexpected agent');
return;
}
// Process response through unified AgentResponseProcessor first
await this.processResponseThroughUnifiedProcessor(response, agentId, assignment);
// Update local assignment status
assignment.lastStatusUpdate = new Date();
// Handle orchestrator-specific response processing
switch (response.status) {
case 'DONE':
await this.handleTaskCompletion(assignment, response);
break;
case 'HELP':
await this.handleHelpRequest(assignment, response);
break;
case 'BLOCKED':
await this.handleTaskBlocked(assignment, response);
break;
case 'IN_PROGRESS':
assignment.status = 'in_progress';
break;
case 'FAILED':
await this.handleTaskFailure(assignment, response);
break;
}
logger.debug({
taskId: response.task_id,
agentId,
status: response.status
}, 'Agent response processed through unified handler');
} catch (error) {
logger.error({ err: error, agentId, responseText }, 'Failed to process agent response');
throw new AppError('Agent response processing failed', { cause: error });
}
}
/**
* Process response through unified AgentResponseProcessor
*/
private async processResponseThroughUnifiedProcessor(
response: AgentResponse,
agentId: string,
_assignment: TaskAssignment
): Promise<void> {
try {
// Import AgentResponseProcessor dynamically to avoid circular dependencies
const { AgentResponseProcessor } = await import('../../agent-response/index.js');
const responseProcessor = AgentResponseProcessor.getInstance();
// Convert orchestrator response format to unified format
const unifiedResponse = {
agentId,
taskId: response.task_id,
status: this.mapResponseStatusToUnified(response.status),
response: response.message || 'Task completed',
completionDetails: this.extractCompletionDetails(response),
receivedAt: Date.now()
};
// Process through unified processor
await responseProcessor.processResponse(unifiedResponse);
logger.debug({
taskId: response.task_id,
agentId,
status: response.status
}, 'Response processed through unified AgentResponseProcessor');
} catch (error) {
logger.warn({ err: error, taskId: response.task_id, agentId },
'Failed to process response through unified processor, continuing with local processing');
// Don't throw - continue with local processing
}
}
/**
* Map orchestrator response status to unified format
*/
private mapResponseStatusToUnified(status: string): 'DONE' | 'ERROR' | 'PARTIAL' {
switch (status) {
case 'DONE':
return 'DONE';
case 'FAILED':
case 'BLOCKED':
return 'ERROR';
case 'IN_PROGRESS':
case 'HELP':
return 'PARTIAL';
default:
return 'PARTIAL';
}
}
/**
* Extract completion details from response
*/
private extractCompletionDetails(response: AgentResponse): {
executionTime: number;
filesModified: string[];
testsRun: number;
testsPassed: number;
deploymentStatus?: string;
notes?: string;
} {
const completionDetails = response.completion_details;
return {
executionTime: 0, // Not available in current AgentResponse format
filesModified: completionDetails?.files_modified || [],
testsPassed: completionDetails?.tests_passed ? 1 : 0, // Convert boolean to number
testsRun: completionDetails?.tests_passed !== undefined ? 1 : 0, // If we have test result, assume 1 test
deploymentStatus: completionDetails?.build_successful ? 'success' : 'failed',
notes: completionDetails?.notes || response.message
};
}
/**
* Register task completion callback
*/
registerTaskCompletionCallback(
taskId: string,
callback: (taskId: string, success: boolean, details?: Record<string, unknown>) => Promise<void>
): void {
this.taskCompletionCallbacks.set(taskId, callback);
logger.debug({ taskId }, 'Task completion callback registered');
}
/**
* Register scheduler callback for all tasks
*/
async registerSchedulerCallback(): Promise<void> {
try {
// Import TaskScheduler dynamically to avoid circular dependencies
const { TaskScheduler } = await import('./task-scheduler.js');
// Create a callback that notifies the scheduler when tasks complete
const schedulerCallback = async (taskId: string, success: boolean, details?: Record<string, unknown>) => {
try {
// Get the current scheduler instance (if any)
const currentScheduler = TaskScheduler.getCurrentInstance();
if (currentScheduler) {
if (success) {
await currentScheduler.markTaskCompleted(taskId);
logger.info({ taskId }, 'Notified scheduler of task completion');
} else {
// Handle task failure - could add markTaskFailed method to scheduler
logger.warn({ taskId, details }, 'Task failed - scheduler notification skipped');
}
} else {
logger.debug({ taskId }, 'No active scheduler instance to notify');
}
} catch (error) {
logger.error({ err: error, taskId }, 'Failed to notify scheduler of task completion');
}
};
// Register this callback for all current assignments
for (const taskId of this.assignments.keys()) {
this.registerTaskCompletionCallback(taskId, schedulerCallback);
}
logger.info('Scheduler callback registered for all current tasks');
} catch (error) {
logger.warn({ err: error }, 'Failed to register scheduler callback');
}
}
/**
* Trigger task completion callbacks
*/
private async triggerTaskCompletionCallbacks(
taskId: string,
success: boolean,
details?: Record<string, unknown>
): Promise<void> {
const callback = this.taskCompletionCallbacks.get(taskId);
if (callback) {
try {
await callback(taskId, success, details);
logger.debug({ taskId, success }, 'Task completion callback triggered');
} catch (error) {
logger.error({ err: error, taskId }, 'Task completion callback failed');
} finally {
// Clean up callback after use
this.taskCompletionCallbacks.delete(taskId);
}
}
}
/**
* Get current task assignments map (for unified response processing)
*/
getAssignmentsMap(): Map<string, TaskAssignment> {
return this.assignments;
}
/**
* Get communication channel for external service coordination
*/
getCommunicationChannel(): AgentCommunicationChannel {
return this.communicationChannel;
}
/**
* Get agent statistics
*/
getAgentStats(): {
totalAgents: number;
availableAgents: number;
busyAgents: number;
offlineAgents: number;
totalAssignments: number;
queuedTasks: number;
} {
const agents = Array.from(this.agents.values());
return {
totalAgents: agents.length,
availableAgents: agents.filter(a => a.status === 'available').length,
busyAgents: agents.filter(a => a.status === 'busy').length,
offlineAgents: agents.filter(a => a.status === 'offline').length,
totalAssignments: this.assignments.size,
queuedTasks: this.taskQueue.length
};
}
/**
* Get all registered agents
*/
getAgents(): AgentInfo[] {
return Array.from(this.agents.values());
}
/**
* Get task assignments
*/
getAssignments(): TaskAssignment[] {
return Array.from(this.assignments.values());
}
/**
* Get active executions
*/
getActiveExecutions(): TaskExecutionResult[] {
return Array.from(this.activeExecutions.values());
}
/**
* Get execution result by ID
*/
getExecutionResult(executionId: string): TaskExecutionResult | undefined {
return this.activeExecutions.get(executionId);
}
/**
* Cancel task execution
*/
async cancelExecution(executionId: string): Promise<boolean> {
const execution = this.activeExecutions.get(executionId);
if (!execution) {
return false;
}
// Clean up monitoring
const monitoringHandle = this.executionMonitors.get(executionId);
if (monitoringHandle) {
clearInterval(monitoringHandle);
this.executionMonitors.delete(executionId);
}
// Update execution status
execution.status = 'failed';
execution.endTime = new Date();
execution.error = 'Execution cancelled';
execution.message = 'Task execution was cancelled';
logger.info({ executionId }, 'Task execution cancelled');
return true;
}
/**
* Check if agent is capable of handling the task
*/
private isAgentCapableOfTask(agent: AgentInfo, task: AtomicTask): boolean {
// If task has no specific type, any agent can handle it
if (!task.type) {
return true;
}
// Map task types to required capabilities
const taskTypeCapabilities: Record<string, string[]> = {
'frontend': ['frontend', 'development', 'general'],
'backend': ['backend', 'development', 'general'],
'database': ['database', 'backend', 'development', 'general'],
'testing': ['testing', 'general'],
'deployment': ['devops', 'deployment', 'general'],
'documentation': ['documentation', 'general'],
'refactoring': ['refactoring', 'development', 'general'],
'debugging': ['debugging', 'development', 'general'],
'development': ['development', 'frontend', 'backend', 'general']
};
const requiredCapabilities = taskTypeCapabilities[task.type] || ['general'];
// Check if agent has any of the required capabilities
return requiredCapabilities.some(capability =>
agent.capabilities.includes(capability as AgentCapability)
);
}
/**
* Map task priority to assignment priority
*/
private mapTaskPriorityToAssignmentPriority(taskPriority: TaskPriority): 'low' | 'normal' | 'high' | 'urgent' {
const priorityMap: Record<TaskPriority, 'low' | 'normal' | 'high' | 'urgent'> = {
'low': 'low',
'medium': 'normal',
'high': 'high',
'critical': 'urgent'
};
return priorityMap[taskPriority] || 'normal';
}
/**
* Select best agent for task based on strategy
*/
private selectBestAgent(task: AtomicTask): AgentInfo | null {
const availableAgents = Array.from(this.agents.values())
.filter(agent =>
agent.status === 'available' &&
agent.currentTasks.length < agent.maxConcurrentTasks
);
if (availableAgents.length === 0) {
return null;
}
switch (this.config.loadBalancingStrategy) {
case 'capability_based':
return this.selectByCapability(availableAgents, task);
case 'performance_based':
return this.selectByPerformance(availableAgents);
case 'round_robin':
default:
return availableAgents[0]; // Simple round-robin
}
}
/**
* Enhanced agent selection by capability matching with load balancing
*/
private selectByCapability(agents: AgentInfo[], task: AtomicTask): AgentInfo | null {
// Enhanced capability mapping for different task types
const taskCapabilityMap: Record<string, string[]> = {
'frontend': ['frontend', 'development', 'general'],
'backend': ['backend', 'development', 'general'],
'database': ['database', 'backend', 'development', 'general'],
'testing': ['testing', 'general'],
'deployment': ['devops', 'deployment', 'general'],
'documentation': ['documentation', 'general'],
'refactoring': ['refactoring', 'development', 'general'],
'debugging': ['debugging', 'development', 'general'],
'development': ['development', 'frontend', 'backend', 'general']
};
const requiredCapabilities = taskCapabilityMap[task.type] || ['general'];
// Find agents with matching capabilities using enhanced matching
const capableAgents = agents.filter(agent =>
this.isAgentCapableForTask(agent, task, requiredCapabilities)
);
if (capableAgents.length === 0) {
// No exact capability match, use load balancing on all available agents
return this.selectByLoadBalancing(agents);
}
if (capableAgents.length === 1) {
return capableAgents[0];
}
// Multiple capable agents - use enhanced selection criteria
return this.selectBestCapableAgent(capableAgents, task);
}
/**
* Enhanced agent capability checking with task context
*/
private isAgentCapableForTask(agent: AgentInfo, task: AtomicTask, requiredCapabilities: string[]): boolean {
// Direct capability match
const hasDirectMatch = requiredCapabilities.some(cap =>
agent.capabilities.includes(cap as AgentCapability)
);
if (hasDirectMatch) {
return true;
}
// Enhanced matching based on task characteristics
const taskTags = task.tags || [];
const taskDescription = task.description.toLowerCase();
// Check for capability matches in tags and description
for (const capability of agent.capabilities) {
const capabilityStr = capability.toString();
if (taskTags.includes(capabilityStr) || taskDescription.includes(capabilityStr)) {
return true;
}
}
// Special capability mappings for enhanced matching
const capabilityMappings = new Map([
['frontend', ['ui', 'react', 'vue', 'angular', 'css', 'html', 'javascript']],
['backend', ['api', 'server', 'database', 'node', 'python', 'java']],
['devops', ['deploy', 'docker', 'kubernetes', 'ci/cd', 'pipeline']],
['testing', ['test', 'spec', 'unit', 'integration', 'e2e']],
['documentation', ['docs', 'readme', 'guide', 'manual']],
['research', ['investigate', 'analyze', 'study', 'explore']]
]);
for (const capability of agent.capabilities) {
const keywords = capabilityMappings.get(capability.toString()) || [];
if (keywords.some(keyword =>
taskDescription.includes(keyword) || taskTags.includes(keyword)
)) {
return true;
}
}
return false;
}
/**
* Select agent using load balancing criteria
*/
private selectByLoadBalancing(agents: AgentInfo[]): AgentInfo {
// Sort by current load (fewer current tasks = lower load)
return agents.reduce((best, current) => {
const bestLoad = best.currentTasks.length / best.maxConcurrentTasks;
const currentLoad = current.currentTasks.length / current.maxConcurrentTasks;
return currentLoad < bestLoad ? current : best;
});
}
/**
* Select the best agent from capable agents using multiple criteria
*/
private selectBestCapableAgent(capableAgents: AgentInfo[], task: AtomicTask): AgentInfo {
return capableAgents.reduce((best, current) => {
const bestScore = this.calculateAgentScore(best, task);
const currentScore = this.calculateAgentScore(current, task);
return currentScore > bestScore ? current : best;
});
}
/**
* Calculate comprehensive agent score for task assignment
*/
private calculateAgentScore(agent: AgentInfo, task: AtomicTask): number {
// Load score (lower load is better)
const loadRatio = agent.currentTasks.length / agent.maxConcurrentTasks;
const loadScore = Math.max(0, 1 - loadRatio) * 40; // 40% weight
// Performance score
const performanceScore = (
agent.performance.successRate * 0.6 +
(1 / Math.max(1, agent.performance.averageCompletionTime / 3600)) * 0.4
) * 30; // 30% weight
// Capability relevance score
const capabilityScore = this.calculateCapabilityRelevance(agent, task) * 20; // 20% weight
// Context score (same project/epic bonus)
const contextScore = this.calculateContextScore(agent, task) * 10; // 10% weight
return loadScore + performanceScore + capabilityScore + contextScore;
}
/**
* Calculate how relevant an agent's capabilities are for the task
*/
private calculateCapabilityRelevance(agent: AgentInfo, task: AtomicTask): number {
const taskType = task.type;
const taskTags = task.tags || [];
const taskDescription = task.description.toLowerCase();
let relevanceScore = 0;
// Direct task type match
if (agent.capabilities.some(cap => cap.toString() === taskType)) {
relevanceScore += 50;
}
// Tag matches
for (const tag of taskTags) {
if (agent.capabilities.some(cap => cap.toString().includes(tag))) {
relevanceScore += 10;
}
}
// Description keyword matches
const keywords = ['frontend', 'backend', 'api', 'database', 'test', 'deploy', 'docs'];
for (const keyword of keywords) {
if (taskDescription.includes(keyword)) {
if (agent.capabilities.some(cap => cap.toString().includes(keyword))) {
relevanceScore += 5;
}
}
}
return Math.min(100, relevanceScore); // Cap at 100
}
/**
* Calculate context score based on agent's current work
*/
private calculateContextScore(agent: AgentInfo, task: AtomicTask): number {
let contextScore = 0;
// Check if agent is already working on tasks from the same project/epic
for (const currentTaskId of agent.currentTasks) {
// In a real implementation, we would fetch the current task details
// For now, we'll use a simplified scoring based on task ID patterns
if (currentTaskId.includes(task.projectId)) {
contextScore += 30; // Same project bonus
}
if (currentTaskId.includes(task.epicId)) {
contextScore += 20; // Same epic bonus
}
}
return Math.min(100, contextScore); // Cap at 100
}
/**
* Select agent by performance metrics
*/
private selectByPerformance(agents: AgentInfo[]): AgentInfo {
return agents.reduce((best, current) => {
const bestScore = best.performance.successRate * (1 / (best.performance.averageCompletionTime || 1));
const currentScore = current.performance.successRate * (1 / (current.performance.averageCompletionTime || 1));
return currentScore > bestScore ? current : best;
});
}
/**
* Handle execution failure
*/
private async handleExecutionFailure(assignment: TaskAssignment, reason: string): Promise<void> {
assignment.status = 'failed';
assignment.lastStatusUpdate = new Date();
const agent = this.agents.get(assignment.agentId);
if (agent) {
// Remove task from agent's current tasks
agent.currentTasks = agent.currentTasks.filter(id => id !== assignment.taskId);
// Update agent status if no longer busy
if (agent.currentTasks.length < agent.maxConcurrentTasks) {
agent.status = 'available';
}
}
logger.error({
taskId: assignment.taskId,
agentId: assignment.agentId,
reason
}, 'Task execution failed');
}
/**
* Retry task execution
*/
private async retryTaskExecution(
assignment: TaskAssignment,
options: ExecutionOptions,
executionId: string,
startTime: Date,
attempt: number
): Promise<TaskExecutionResult> {
logger.info({
taskId: assignment.taskId,
attempt,
maxRetries: options.maxRetries
}, 'Retrying task execution');
try {
// Reset assignment status
assignment.status = 'assigned';
assignment.attempts = attempt;
assignment.lastStatusUpdate = new Date();
// Get task payload again
const agent = this.agents.get(assignment.agentId);
if (!agent) {
throw new Error(`Agent ${assignment.agentId} not found for retry`);
}
// For retry, we need to reconstruct the task and context
// In a full implementation, these would be stored with the assignment
// For now, return failure since we don't have the original task/context
return {
success: false,
status: 'failed',
message: `Task retry failed: Original task and context not available for retry attempt ${attempt}`,
startTime,
endTime: new Date(),
assignment,
error: 'Task and context reconstruction not implemented for retries',
metadata: {
executionId,
attempts: attempt,
totalDuration: Date.now() - startTime.getTime(),
agentId: assignment.agentId
}
};
} catch (error) {
logger.error({ err: error, taskId: assignment.taskId, attempt }, 'Task retry failed');
throw error;
}
}
/**
* Handle task completion
*/
private async handleTaskCompletion(assignment: TaskAssignment, response: AgentResponse): Promise<void> {
assignment.status = 'completed';
// Update agent performance
const agent = this.agents.get(assignment.agentId);
if (agent) {
const oldStatus = agent.status;
agent.performance.tasksCompleted++;
agent.performance.lastTaskCompletedAt = new Date();
// Remove task from agent's current tasks
agent.currentTasks = agent.currentTasks.filter(id => id !== assignment.taskId);
// Update agent status if no longer busy
if (agent.currentTasks.length < agent.maxConcurrentTasks) {
agent.status = 'available';
}
// Propagate status change if it changed
if (agent.status !== oldStatus) {
this.integrationBridge.propagateStatusChange(agent.id, agent.status, 'orchestrator')
.catch(error => {
logger.warn({ err: error, agentId: agent.id, status: agent.status }, 'Failed to propagate status change from task completion');
});
}
// Propagate task completion
this.integrationBridge.propagateTaskStatusChange(agent.id, assignment.taskId, 'completed', 'orchestrator')
.catch(error => {
logger.warn({ err: error, agentId: agent.id, taskId: assignment.taskId }, 'Failed to propagate task completion');
});
// Send SSE notification for task completion (moved after completionDetails definition)
// This will be added after completionDetails is defined
}
// Trigger task completion callbacks (notify scheduler)
const completionDetails = {
agentId: assignment.agentId,
duration: Date.now() - assignment.assignedAt.getTime(),
response: response.message,
completionDetails: response.completion_details
};
await this.triggerTaskCompletionCallbacks(assignment.taskId, true, completionDetails);
// Send SSE notification for task completion
if (agent) {
const sessionId = agent.metadata?.preferences?.sessionId;
if (this.sseNotifier && sessionId) {
this.sseNotifier.sendEvent(sessionId as string, 'taskCompleted', {
agentId: agent.id,
taskId: assignment.taskId,
completedAt: new Date().toISOString(),
duration: completionDetails.duration,
response: completionDetails.response
}).catch((error: unknown) => {
logger.warn({ err: error, agentId: agent.id, taskId: assignment.taskId }, 'Failed to send SSE task completion notification');
});
// Broadcast task completion for monitoring
this.sseNotifier.broadcastEvent('taskCompletionUpdate', {
agentId: agent.id,
taskId: assignment.taskId,
status: 'completed',
completedAt: new Date().toISOString(),
duration: completionDetails.duration
}).catch((error: unknown) => {
logger.warn({ err: error }, 'Failed to broadcast SSE task completion update');
});
}
}
// Process next queued task if available
await this.processTaskQueue();
logger.info({
taskId: assignment.taskId,
agentId: assignment.agentId,
duration: completionDetails.duration
}, 'Task completed successfully and callbacks triggered');
}
/**
* Handle help request
*/
private async handleHelpRequest(assignment: TaskAssignment, response: AgentResponse): Promise<void> {
logger.warn({
taskId: assignment.taskId,
agentId: assignment.agentId,
helpRequest: response.help_request
}, 'Agent requested help');
// For now, just log the help request
// In a full implementation, this could trigger human intervention
}
/**
* Handle blocked task
*/
private async handleTaskBlocked(assignment: TaskAssignment, response: AgentResponse): Promise<void> {
logger.warn({
taskId: assignment.taskId,
agentId: assignment.agentId,
blockerDetails: response.blocker_details
}, 'Task blocked');
// For now, just log the blocker
// In a full implementation, this could trigger dependency resolution
}
/**
* Handle task failure
*/
private async handleTaskFailure(assignment: TaskAssignment, response: AgentResponse): Promise<void> {
assignment.status = 'failed';
assignment.attempts++;
const agent = this.agents.get(assignment.agentId);
if (agent) {
// Remove task from agent's current tasks
agent.currentTasks = agent.currentTasks.filter(id => id !== assignment.taskId);
// Update agent status
if (agent.currentTasks.length < agent.maxConcurrentTasks) {
agent.status = 'available';
}
}
// Retry if under max attempts
if (assignment.attempts < this.config.maxRetries) {
this.taskQueue.unshift(assignment.taskId); // Add to front of queue for retry
logger.info({
taskId: assignment.taskId,
attempt: assignment.attempts
}, 'Task queued for retry');
} else {
// Task failed permanently - trigger failure callbacks
const failureDetails = {
agentId: assignment.agentId,
attempts: assignment.attempts,
response: response.message,
error: 'Task failed after max retries'
};
await this.triggerTaskCompletionCallbacks(assignment.taskId, false, failureDetails);
logger.error({
taskId: assignment.taskId,
agentId: assignment.agentId,
attempts: assignment.attempts
}, 'Task failed after max retries and callbacks triggered');
}
}
/**
* Reassign tasks from an agent
*/
private async reassignAgentTasks(agentId: string): Promise<void> {
const agentAssignments = Array.from(this.assignments.values())
.filter(assignment =>
assignment.agentId === agentId &&
['assigned', 'in_progress'].includes(assignment.status)
);
for (const assignment of agentAssignments) {
// Add back to queue for reassignment
this.taskQueue.unshift(assignment.taskId);
assignment.status = 'failed';
logger.info({
taskId: assignment.taskId,
originalAgent: agentId
}, 'Task queued for reassignment');
}
await this.processTaskQueue();
}
/**
* Process queued tasks
*/
private async processTaskQueue(): Promise<void> {
while (this.taskQueue.length > 0) {
const taskId = this.taskQueue[0];
// Try to find an available agent
const availableAgents = Array.from(this.agents.values())
.filter(agent =>
agent.status === 'available' &&
agent.currentTasks.length < agent.maxConcurrentTasks
);
if (availableAgents.length === 0) {
break; // No available agents, stop processing
}
// Remove from queue and process
this.taskQueue.shift();
// Note: In a full implementation, we'd need to retrieve the task and context
// For now, just log that we're processing the queue
logger.debug({ taskId }, 'Processing queued task');
}
}
/**
* Start heartbeat monitoring
*/
private startHeartbeatMonitoring(): void {
if (!this.config.enableHealthChecks) {
return;
}
this.heartbeatTimer = setInterval(() => {
this.checkAgentHealth();
}, this.config.heartbeatInterval);
}
/**
* Check agent health and mark offline if needed
* Implements exponential backoff for heartbeat tolerance
*/
private checkAgentHealth(): void {
const now = new Date();
const baseHeartbeatInterval = this.config.heartbeatInterval;
for (const agent of this.agents.values()) {
const timeSinceHeartbeat = now.getTime() - agent.lastHeartbeat.getTime();
const agentId = agent.id;
// Get current missed heartbeat count
const missedCount = this.agentHeartbeatMisses.get(agentId) || 0;
// Calculate adaptive timeout with exponential backoff
const adaptiveTimeout = this.calculateAdaptiveHeartbeatTimeout(missedCount, baseHeartbeatInterval);
if (timeSinceHeartbeat > adaptiveTimeout) {
// Increment missed heartbeat count
const newMissedCount = missedCount + 1;
this.agentHeartbeatMisses.set(agentId, newMissedCount);
if (newMissedCount >= this.config.maxHeartbeatMisses && agent.status !== 'offline') {
// Mark agent as offline after maximum misses
agent.status = 'offline';
logger.warn({
agentId,
timeSinceHeartbeat,
missedHeartbeats: newMissedCount,
adaptiveTimeout
}, 'Agent marked as offline due to excessive missed heartbeats');
// Propagate offline status
this.integrationBridge.propagateStatusChange(agentId, 'offline', 'orchestrator')
.catch(error => {
logger.warn({ err: error, agentId }, 'Failed to propagate offline status from health check');
});
// Reassign tasks from offline agent
this.reassignAgentTasks(agentId).catch(error => {
logger.error({ err: error, agentId }, 'Failed to reassign tasks from offline agent');
});
// Reset missed count after marking offline
this.agentHeartbeatMisses.delete(agentId);
} else if (newMissedCount < this.config.maxHeartbeatMisses) {
// Log warning but don't mark offline yet
logger.warn({
agentId,
timeSinceHeartbeat,
missedHeartbeats: newMissedCount,
maxMisses: this.config.maxHeartbeatMisses,
adaptiveTimeout
}, 'Agent missed heartbeat - applying exponential backoff tolerance');
}
}
}
}
/**
* Calculate adaptive heartbeat timeout with exponential backoff
*/
private calculateAdaptiveHeartbeatTimeout(missedCount: number, baseInterval: number): number {
if (missedCount === 0) {
return baseInterval * this.config.heartbeatTimeoutMultiplier;
}
// Exponential backoff: each miss increases tolerance
const backoffMultiplier = Math.pow(1.5, Math.min(missedCount, 5)); // Cap at 5 for reasonable limits
return baseInterval * this.config.heartbeatTimeoutMultiplier * backoffMultiplier;
}
/**
* Get transport status for agent communication using dynamic port allocation
*/
getTransportStatus(): {
websocket: { available: boolean; port?: number; endpoint?: string };
http: { available: boolean; port?: number; endpoint?: string };
sse: { available: boolean; port?: number; endpoint?: string };
stdio: { available: boolean };
} {
if (this.communicationChannel && 'getTransportStatus' in this.communicationChannel &&
typeof (this.communicationChannel as { getTransportStatus?: unknown }).getTransportStatus === 'function') {
type TransportStatus = {
websocket: { available: boolean; port?: number; clients?: number };
http: { available: boolean; port?: number; endpoint?: string };
sse: { available: boolean; port?: number; endpoint?: string };
stdio: { available: boolean };
};
return (this.communicationChannel as { getTransportStatus: () => TransportStatus }).getTransportStatus();
}
// Fallback: get transport status directly from Transport Manager
try {
const allocatedPorts = transportManager.getAllocatedPorts();
const endpoints = transportManager.getServiceEndpoints();
return {
websocket: {
available: !!allocatedPorts.websocket,
port: allocatedPorts.websocket,
endpoint: endpoints.websocket
},
http: {
available: !!allocatedPorts.http,
port: allocatedPorts.http,
endpoint: endpoints.http
},
sse: {
available: !!allocatedPorts.sse,
port: allocatedPorts.sse,
endpoint: endpoints.sse
},
stdio: {
available: true // stdio is always available
}
};
} catch (error) {
logger.warn({ err: error }, 'Failed to get transport status from orchestrator');
return {
websocket: { available: false },
http: { available: false },
sse: { available: false },
stdio: { available: true }
};
}
}
/**
* Cleanup resources
*/
async destroy(): Promise<void> {
if (this.heartbeatTimer) {
clearInterval(this.heartbeatTimer);
}
// Clean up all execution monitors
for (const [executionId, handle] of this.executionMonitors.entries()) {
clearInterval(handle);
logger.debug({ executionId }, 'Cleaned up execution monitor');
}
this.executionMonitors.clear();
// Close communication channel
if (this.communicationChannel && typeof this.communicationChannel.close === 'function') {
await this.communicationChannel.close();
}
this.agents.clear();
this.assignments.clear();
this.activeExecutions.clear();
this.taskQueue = [];
AgentOrchestrator.instance = null;
logger.info('Agent orchestrator destroyed');
}
}