hybrid-tools.ts•17.8 kB
/**
* ByteBot Hybrid Orchestration MCP Tools
*
* Advanced workflow tools that combine Agent API and Desktop API
* for intelligent task monitoring, intervention, and multi-step workflows
*/
import { AgentClient } from '../clients/agent-client.js';
import { DesktopClient } from '../clients/desktop-client.js';
import { WebSocketClient } from '../clients/websocket-client.js';
import { Task, TaskStatus } from '../types/bytebot.js';
import {
MonitorOptions,
WorkflowStep,
WorkflowExecutionResult,
} from '../types/mcp.js';
import { formatErrorForMCP, sleep } from '../utils/error-handler.js';
import { EnvironmentConfig } from '../types/mcp.js';
/**
* Tool definitions for hybrid orchestration
*/
export function getHybridTools() {
return [
{
name: 'bytebot_create_and_monitor_task',
description:
'Create a task and monitor its progress until completion or intervention needed. ' +
'Automatically polls task status and returns when task reaches a terminal state ' +
'(COMPLETED, NEEDS_HELP, NEEDS_REVIEW, FAILED, CANCELLED) or timeout is reached. ' +
'This is the recommended way to execute tasks when you want to wait for results.',
inputSchema: {
type: 'object' as const,
properties: {
description: {
type: 'string',
description: 'Natural language description of the task to execute',
},
priority: {
type: 'string',
enum: ['LOW', 'MEDIUM', 'HIGH', 'URGENT'],
description: 'Task priority level. Default: MEDIUM',
default: 'MEDIUM',
},
timeout: {
type: 'number',
description:
'Maximum time to wait for task completion in milliseconds. Default: 300000 (5 minutes)',
default: 300000,
},
pollInterval: {
type: 'number',
description:
'How often to check task status in milliseconds. Default: 2000 (2 seconds)',
default: 2000,
},
stopOnStatus: {
type: 'array',
items: {
type: 'string',
},
description:
'Stop monitoring when task reaches any of these statuses. Default: [COMPLETED, NEEDS_HELP, NEEDS_REVIEW, FAILED, CANCELLED]',
},
},
required: ['description'],
},
},
{
name: 'bytebot_intervene_in_task',
description:
'Provide intervention for a task in NEEDS_HELP state. ' +
'Send guidance to the task and optionally resume, cancel, or retry it. ' +
'Use this when a task is stuck and needs human input to proceed.',
inputSchema: {
type: 'object' as const,
properties: {
taskId: {
type: 'string',
description: 'ID of the task that needs intervention',
},
message: {
type: 'string',
description:
'Intervention message with guidance or instructions for the task',
},
action: {
type: 'string',
enum: ['resume', 'cancel', 'retry'],
description:
'Action to take after intervention. Default: resume',
default: 'resume',
},
continueMonitoring: {
type: 'boolean',
description:
'Whether to continue monitoring the task after intervention. Default: true',
default: true,
},
timeout: {
type: 'number',
description:
'Maximum time to wait after intervention in milliseconds. Default: 300000 (5 minutes)',
default: 300000,
},
},
required: ['taskId', 'message'],
},
},
{
name: 'bytebot_execute_workflow',
description:
'Execute a multi-step workflow with automatic task creation, monitoring, and error recovery. ' +
'Each step is executed as a separate task, with automatic intervention handling. ' +
'Use this for complex multi-step automation scenarios.',
inputSchema: {
type: 'object' as const,
properties: {
steps: {
type: 'array',
items: {
type: 'object',
properties: {
name: {
type: 'string',
description: 'Step name (for logging)',
},
description: {
type: 'string',
description: 'Task description for this step',
},
timeout: {
type: 'number',
description: 'Step timeout in milliseconds',
},
retryOnFailure: {
type: 'boolean',
description: 'Retry this step if it fails',
},
maxRetries: {
type: 'number',
description: 'Maximum retry attempts',
},
},
required: ['name', 'description'],
},
description: 'Array of workflow steps to execute in sequence',
},
priority: {
type: 'string',
enum: ['LOW', 'MEDIUM', 'HIGH', 'URGENT'],
description: 'Priority for all tasks in the workflow. Default: MEDIUM',
default: 'MEDIUM',
},
stopOnFailure: {
type: 'boolean',
description:
'Stop workflow if any step fails. Default: true',
default: true,
},
},
required: ['steps'],
},
},
{
name: 'bytebot_monitor_task',
description:
'Monitor an existing task until it reaches a terminal state or timeout. ' +
'Use this when you have already created a task and want to wait for its completion.',
inputSchema: {
type: 'object' as const,
properties: {
taskId: {
type: 'string',
description: 'ID of the task to monitor',
},
timeout: {
type: 'number',
description:
'Maximum time to wait in milliseconds. Default: 300000 (5 minutes)',
default: 300000,
},
pollInterval: {
type: 'number',
description:
'How often to check task status in milliseconds. Default: 2000 (2 seconds)',
default: 2000,
},
stopOnStatus: {
type: 'array',
items: {
type: 'string',
},
description:
'Stop monitoring when task reaches any of these statuses',
},
},
required: ['taskId'],
},
},
];
}
/**
* Hybrid orchestration service
*/
export class HybridOrchestrator {
constructor(
private agentClient: AgentClient,
// @ts-expect-error - Reserved for future direct desktop control integration
private _desktopClient: DesktopClient,
// @ts-expect-error - Reserved for future WebSocket event integration
private _wsClient: WebSocketClient | null,
private config: EnvironmentConfig
) {}
/**
* Monitor a task until completion or timeout
*/
async monitorTask(
taskId: string,
options: MonitorOptions = {}
): Promise<Task> {
const timeout = options.timeout || this.config.taskMonitorTimeout;
const pollInterval = options.pollInterval || this.config.taskPollInterval;
const stopOnStatus = options.stopOnStatus || [
'COMPLETED',
'NEEDS_HELP',
'NEEDS_REVIEW',
'FAILED',
'CANCELLED',
];
const startTime = Date.now();
console.log(`[ByteBot MCP] Starting task monitoring: ${taskId}`);
while (true) {
// Check timeout
if (Date.now() - startTime > timeout) {
throw new Error(
`Task monitoring timeout after ${timeout}ms. Task may still be running.`
);
}
// Fetch current task status
const task = await this.agentClient.getTask(taskId, false);
console.log(
`[ByteBot MCP] Task ${taskId} status: ${task.status} (${Math.floor((Date.now() - startTime) / 1000)}s elapsed)`
);
// Call status change callback if provided
if (options.onStatusChange) {
options.onStatusChange(task);
}
// Check if we should stop monitoring
if (stopOnStatus.includes(task.status)) {
console.log(
`[ByteBot MCP] Task monitoring complete: ${task.status}`
);
return task;
}
// Wait before next poll
await sleep(pollInterval);
}
}
/**
* Create task and monitor until completion
*/
async createAndMonitorTask(
description: string,
priority: 'LOW' | 'MEDIUM' | 'HIGH' | 'URGENT' = 'MEDIUM',
options: MonitorOptions = {}
): Promise<Task> {
console.log('[ByteBot MCP] Creating and monitoring task...');
// Create task
const createResponse = await this.agentClient.createTask({
description,
priority,
});
console.log(`[ByteBot MCP] Task created: ${createResponse.id}`);
// Monitor task
return await this.monitorTask(createResponse.id, options);
}
/**
* Intervene in a task that needs help
*/
async interveneInTask(
taskId: string,
message: string,
action: 'resume' | 'cancel' | 'retry' = 'resume',
continueMonitoring = true,
timeout = 300000
): Promise<Task> {
console.log(`[ByteBot MCP] Intervening in task ${taskId}: ${action}`);
// Get current task state
const task = await this.agentClient.getTask(taskId, false);
if (task.status !== 'NEEDS_HELP') {
throw new Error(
`Cannot intervene in task with status ${task.status}. Task must be in NEEDS_HELP state.`
);
}
// Determine new status based on action
let newStatus: TaskStatus;
switch (action) {
case 'resume':
newStatus = 'IN_PROGRESS';
break;
case 'cancel':
newStatus = 'CANCELLED';
break;
case 'retry':
newStatus = 'PENDING';
break;
}
// Update task with intervention message
const updatedTask = await this.agentClient.updateTask(taskId, {
status: newStatus,
message,
});
console.log(
`[ByteBot MCP] Task ${taskId} intervention applied: ${newStatus}`
);
// Continue monitoring if requested and not cancelled
if (continueMonitoring && action !== 'cancel') {
return await this.monitorTask(taskId, { timeout });
}
return updatedTask;
}
/**
* Execute a multi-step workflow
*/
async executeWorkflow(
steps: WorkflowStep[],
priority: 'LOW' | 'MEDIUM' | 'HIGH' | 'URGENT' = 'MEDIUM',
stopOnFailure = true
): Promise<WorkflowExecutionResult> {
console.log(
`[ByteBot MCP] Executing workflow with ${steps.length} steps`
);
const result: WorkflowExecutionResult = {
steps: [],
overallStatus: 'completed',
totalInterventions: 0,
};
for (let i = 0; i < steps.length; i++) {
const step = steps[i];
console.log(
`[ByteBot MCP] Executing step ${i + 1}/${steps.length}: ${step.name}`
);
const stepResult: WorkflowExecutionResult['steps'][0] = {
name: step.name,
taskId: '',
status: 'PENDING',
};
let retryCount = 0;
const maxRetries = step.maxRetries || 0;
while (retryCount <= maxRetries) {
try {
// Create and monitor task for this step
const task = await this.createAndMonitorTask(
step.description,
priority,
{
timeout: step.timeout || this.config.taskMonitorTimeout,
}
);
stepResult.taskId = task.id;
stepResult.status = task.status;
stepResult.completedAt = task.completedAt;
// Handle different terminal states
if (task.status === 'NEEDS_HELP') {
result.totalInterventions++;
console.warn(
`[ByteBot MCP] Step ${step.name} needs intervention. Manual intervention required.`
);
stepResult.error = 'Task requires manual intervention';
if (stopOnFailure) {
result.overallStatus = 'failed';
result.steps.push(stepResult);
return result;
}
} else if (task.status === 'FAILED') {
if (step.retryOnFailure && retryCount < maxRetries) {
retryCount++;
console.log(
`[ByteBot MCP] Step ${step.name} failed, retrying (${retryCount}/${maxRetries})`
);
continue;
}
stepResult.error = task.error || 'Task failed';
if (stopOnFailure) {
result.overallStatus = 'failed';
result.steps.push(stepResult);
return result;
}
} else if (task.status === 'COMPLETED') {
console.log(`[ByteBot MCP] Step ${step.name} completed successfully`);
}
break; // Exit retry loop
} catch (error) {
stepResult.error =
error instanceof Error ? error.message : String(error);
if (step.retryOnFailure && retryCount < maxRetries) {
retryCount++;
console.log(
`[ByteBot MCP] Step ${step.name} error, retrying (${retryCount}/${maxRetries})`
);
continue;
}
if (stopOnFailure) {
result.overallStatus = 'failed';
result.steps.push(stepResult);
return result;
}
break; // Exit retry loop
}
}
result.steps.push(stepResult);
}
// Determine overall status
const hasFailures = result.steps.some(
(s) => s.status === 'FAILED' || s.error
);
const allCompleted = result.steps.every((s) => s.status === 'COMPLETED');
if (allCompleted) {
result.overallStatus = 'completed';
} else if (hasFailures) {
result.overallStatus = stopOnFailure ? 'failed' : 'partial';
} else {
result.overallStatus = 'partial';
}
console.log(
`[ByteBot MCP] Workflow execution complete: ${result.overallStatus}`
);
return result;
}
}
/**
* Tool handlers for hybrid orchestration
*/
export async function handleHybridTool(
toolName: string,
args: Record<string, unknown>,
orchestrator: HybridOrchestrator
) {
try {
switch (toolName) {
case 'bytebot_create_and_monitor_task': {
const result = await orchestrator.createAndMonitorTask(
args.description as string,
(args.priority as any) || 'MEDIUM',
{
timeout: (args.timeout as number) || undefined,
pollInterval: (args.pollInterval as number) || undefined,
stopOnStatus: (args.stopOnStatus as string[]) || undefined,
}
);
return {
content: [
{
type: 'text',
text: JSON.stringify(
{
taskId: result.id,
finalStatus: result.status,
completedAt: result.completedAt,
messagesCount: result.messages.length,
task: result,
},
null,
2
),
},
],
};
}
case 'bytebot_intervene_in_task': {
const result = await orchestrator.interveneInTask(
args.taskId as string,
args.message as string,
(args.action as any) || 'resume',
args.continueMonitoring !== false,
(args.timeout as number) || 300000
);
return {
content: [
{
type: 'text',
text: JSON.stringify(
{
taskId: result.id,
status: result.status,
intervention: 'applied',
task: result,
},
null,
2
),
},
],
};
}
case 'bytebot_execute_workflow': {
const result = await orchestrator.executeWorkflow(
args.steps as WorkflowStep[],
(args.priority as any) || 'MEDIUM',
args.stopOnFailure !== false
);
return {
content: [
{
type: 'text',
text: JSON.stringify(result, null, 2),
},
],
};
}
case 'bytebot_monitor_task': {
const result = await orchestrator.monitorTask(args.taskId as string, {
timeout: (args.timeout as number) || undefined,
pollInterval: (args.pollInterval as number) || undefined,
stopOnStatus: (args.stopOnStatus as string[]) || undefined,
});
return {
content: [
{
type: 'text',
text: JSON.stringify(
{
taskId: result.id,
finalStatus: result.status,
completedAt: result.completedAt,
task: result,
},
null,
2
),
},
],
};
}
default:
throw new Error(`Unknown tool: ${toolName}`);
}
} catch (error) {
const errorInfo = formatErrorForMCP(error);
return {
content: [
{
type: 'text',
text: `Error: ${errorInfo.error}${errorInfo.details ? '\n\nDetails:\n' + errorInfo.details : ''}`,
},
],
isError: true,
};
}
}