Skip to main content
Glama
hybrid-tools.ts17.8 kB
/** * ByteBot Hybrid Orchestration MCP Tools * * Advanced workflow tools that combine Agent API and Desktop API * for intelligent task monitoring, intervention, and multi-step workflows */ import { AgentClient } from '../clients/agent-client.js'; import { DesktopClient } from '../clients/desktop-client.js'; import { WebSocketClient } from '../clients/websocket-client.js'; import { Task, TaskStatus } from '../types/bytebot.js'; import { MonitorOptions, WorkflowStep, WorkflowExecutionResult, } from '../types/mcp.js'; import { formatErrorForMCP, sleep } from '../utils/error-handler.js'; import { EnvironmentConfig } from '../types/mcp.js'; /** * Tool definitions for hybrid orchestration */ export function getHybridTools() { return [ { name: 'bytebot_create_and_monitor_task', description: 'Create a task and monitor its progress until completion or intervention needed. ' + 'Automatically polls task status and returns when task reaches a terminal state ' + '(COMPLETED, NEEDS_HELP, NEEDS_REVIEW, FAILED, CANCELLED) or timeout is reached. ' + 'This is the recommended way to execute tasks when you want to wait for results.', inputSchema: { type: 'object' as const, properties: { description: { type: 'string', description: 'Natural language description of the task to execute', }, priority: { type: 'string', enum: ['LOW', 'MEDIUM', 'HIGH', 'URGENT'], description: 'Task priority level. Default: MEDIUM', default: 'MEDIUM', }, timeout: { type: 'number', description: 'Maximum time to wait for task completion in milliseconds. Default: 300000 (5 minutes)', default: 300000, }, pollInterval: { type: 'number', description: 'How often to check task status in milliseconds. Default: 2000 (2 seconds)', default: 2000, }, stopOnStatus: { type: 'array', items: { type: 'string', }, description: 'Stop monitoring when task reaches any of these statuses. Default: [COMPLETED, NEEDS_HELP, NEEDS_REVIEW, FAILED, CANCELLED]', }, }, required: ['description'], }, }, { name: 'bytebot_intervene_in_task', description: 'Provide intervention for a task in NEEDS_HELP state. ' + 'Send guidance to the task and optionally resume, cancel, or retry it. ' + 'Use this when a task is stuck and needs human input to proceed.', inputSchema: { type: 'object' as const, properties: { taskId: { type: 'string', description: 'ID of the task that needs intervention', }, message: { type: 'string', description: 'Intervention message with guidance or instructions for the task', }, action: { type: 'string', enum: ['resume', 'cancel', 'retry'], description: 'Action to take after intervention. Default: resume', default: 'resume', }, continueMonitoring: { type: 'boolean', description: 'Whether to continue monitoring the task after intervention. Default: true', default: true, }, timeout: { type: 'number', description: 'Maximum time to wait after intervention in milliseconds. Default: 300000 (5 minutes)', default: 300000, }, }, required: ['taskId', 'message'], }, }, { name: 'bytebot_execute_workflow', description: 'Execute a multi-step workflow with automatic task creation, monitoring, and error recovery. ' + 'Each step is executed as a separate task, with automatic intervention handling. ' + 'Use this for complex multi-step automation scenarios.', inputSchema: { type: 'object' as const, properties: { steps: { type: 'array', items: { type: 'object', properties: { name: { type: 'string', description: 'Step name (for logging)', }, description: { type: 'string', description: 'Task description for this step', }, timeout: { type: 'number', description: 'Step timeout in milliseconds', }, retryOnFailure: { type: 'boolean', description: 'Retry this step if it fails', }, maxRetries: { type: 'number', description: 'Maximum retry attempts', }, }, required: ['name', 'description'], }, description: 'Array of workflow steps to execute in sequence', }, priority: { type: 'string', enum: ['LOW', 'MEDIUM', 'HIGH', 'URGENT'], description: 'Priority for all tasks in the workflow. Default: MEDIUM', default: 'MEDIUM', }, stopOnFailure: { type: 'boolean', description: 'Stop workflow if any step fails. Default: true', default: true, }, }, required: ['steps'], }, }, { name: 'bytebot_monitor_task', description: 'Monitor an existing task until it reaches a terminal state or timeout. ' + 'Use this when you have already created a task and want to wait for its completion.', inputSchema: { type: 'object' as const, properties: { taskId: { type: 'string', description: 'ID of the task to monitor', }, timeout: { type: 'number', description: 'Maximum time to wait in milliseconds. Default: 300000 (5 minutes)', default: 300000, }, pollInterval: { type: 'number', description: 'How often to check task status in milliseconds. Default: 2000 (2 seconds)', default: 2000, }, stopOnStatus: { type: 'array', items: { type: 'string', }, description: 'Stop monitoring when task reaches any of these statuses', }, }, required: ['taskId'], }, }, ]; } /** * Hybrid orchestration service */ export class HybridOrchestrator { constructor( private agentClient: AgentClient, // @ts-expect-error - Reserved for future direct desktop control integration private _desktopClient: DesktopClient, // @ts-expect-error - Reserved for future WebSocket event integration private _wsClient: WebSocketClient | null, private config: EnvironmentConfig ) {} /** * Monitor a task until completion or timeout */ async monitorTask( taskId: string, options: MonitorOptions = {} ): Promise<Task> { const timeout = options.timeout || this.config.taskMonitorTimeout; const pollInterval = options.pollInterval || this.config.taskPollInterval; const stopOnStatus = options.stopOnStatus || [ 'COMPLETED', 'NEEDS_HELP', 'NEEDS_REVIEW', 'FAILED', 'CANCELLED', ]; const startTime = Date.now(); console.log(`[ByteBot MCP] Starting task monitoring: ${taskId}`); while (true) { // Check timeout if (Date.now() - startTime > timeout) { throw new Error( `Task monitoring timeout after ${timeout}ms. Task may still be running.` ); } // Fetch current task status const task = await this.agentClient.getTask(taskId, false); console.log( `[ByteBot MCP] Task ${taskId} status: ${task.status} (${Math.floor((Date.now() - startTime) / 1000)}s elapsed)` ); // Call status change callback if provided if (options.onStatusChange) { options.onStatusChange(task); } // Check if we should stop monitoring if (stopOnStatus.includes(task.status)) { console.log( `[ByteBot MCP] Task monitoring complete: ${task.status}` ); return task; } // Wait before next poll await sleep(pollInterval); } } /** * Create task and monitor until completion */ async createAndMonitorTask( description: string, priority: 'LOW' | 'MEDIUM' | 'HIGH' | 'URGENT' = 'MEDIUM', options: MonitorOptions = {} ): Promise<Task> { console.log('[ByteBot MCP] Creating and monitoring task...'); // Create task const createResponse = await this.agentClient.createTask({ description, priority, }); console.log(`[ByteBot MCP] Task created: ${createResponse.id}`); // Monitor task return await this.monitorTask(createResponse.id, options); } /** * Intervene in a task that needs help */ async interveneInTask( taskId: string, message: string, action: 'resume' | 'cancel' | 'retry' = 'resume', continueMonitoring = true, timeout = 300000 ): Promise<Task> { console.log(`[ByteBot MCP] Intervening in task ${taskId}: ${action}`); // Get current task state const task = await this.agentClient.getTask(taskId, false); if (task.status !== 'NEEDS_HELP') { throw new Error( `Cannot intervene in task with status ${task.status}. Task must be in NEEDS_HELP state.` ); } // Determine new status based on action let newStatus: TaskStatus; switch (action) { case 'resume': newStatus = 'IN_PROGRESS'; break; case 'cancel': newStatus = 'CANCELLED'; break; case 'retry': newStatus = 'PENDING'; break; } // Update task with intervention message const updatedTask = await this.agentClient.updateTask(taskId, { status: newStatus, message, }); console.log( `[ByteBot MCP] Task ${taskId} intervention applied: ${newStatus}` ); // Continue monitoring if requested and not cancelled if (continueMonitoring && action !== 'cancel') { return await this.monitorTask(taskId, { timeout }); } return updatedTask; } /** * Execute a multi-step workflow */ async executeWorkflow( steps: WorkflowStep[], priority: 'LOW' | 'MEDIUM' | 'HIGH' | 'URGENT' = 'MEDIUM', stopOnFailure = true ): Promise<WorkflowExecutionResult> { console.log( `[ByteBot MCP] Executing workflow with ${steps.length} steps` ); const result: WorkflowExecutionResult = { steps: [], overallStatus: 'completed', totalInterventions: 0, }; for (let i = 0; i < steps.length; i++) { const step = steps[i]; console.log( `[ByteBot MCP] Executing step ${i + 1}/${steps.length}: ${step.name}` ); const stepResult: WorkflowExecutionResult['steps'][0] = { name: step.name, taskId: '', status: 'PENDING', }; let retryCount = 0; const maxRetries = step.maxRetries || 0; while (retryCount <= maxRetries) { try { // Create and monitor task for this step const task = await this.createAndMonitorTask( step.description, priority, { timeout: step.timeout || this.config.taskMonitorTimeout, } ); stepResult.taskId = task.id; stepResult.status = task.status; stepResult.completedAt = task.completedAt; // Handle different terminal states if (task.status === 'NEEDS_HELP') { result.totalInterventions++; console.warn( `[ByteBot MCP] Step ${step.name} needs intervention. Manual intervention required.` ); stepResult.error = 'Task requires manual intervention'; if (stopOnFailure) { result.overallStatus = 'failed'; result.steps.push(stepResult); return result; } } else if (task.status === 'FAILED') { if (step.retryOnFailure && retryCount < maxRetries) { retryCount++; console.log( `[ByteBot MCP] Step ${step.name} failed, retrying (${retryCount}/${maxRetries})` ); continue; } stepResult.error = task.error || 'Task failed'; if (stopOnFailure) { result.overallStatus = 'failed'; result.steps.push(stepResult); return result; } } else if (task.status === 'COMPLETED') { console.log(`[ByteBot MCP] Step ${step.name} completed successfully`); } break; // Exit retry loop } catch (error) { stepResult.error = error instanceof Error ? error.message : String(error); if (step.retryOnFailure && retryCount < maxRetries) { retryCount++; console.log( `[ByteBot MCP] Step ${step.name} error, retrying (${retryCount}/${maxRetries})` ); continue; } if (stopOnFailure) { result.overallStatus = 'failed'; result.steps.push(stepResult); return result; } break; // Exit retry loop } } result.steps.push(stepResult); } // Determine overall status const hasFailures = result.steps.some( (s) => s.status === 'FAILED' || s.error ); const allCompleted = result.steps.every((s) => s.status === 'COMPLETED'); if (allCompleted) { result.overallStatus = 'completed'; } else if (hasFailures) { result.overallStatus = stopOnFailure ? 'failed' : 'partial'; } else { result.overallStatus = 'partial'; } console.log( `[ByteBot MCP] Workflow execution complete: ${result.overallStatus}` ); return result; } } /** * Tool handlers for hybrid orchestration */ export async function handleHybridTool( toolName: string, args: Record<string, unknown>, orchestrator: HybridOrchestrator ) { try { switch (toolName) { case 'bytebot_create_and_monitor_task': { const result = await orchestrator.createAndMonitorTask( args.description as string, (args.priority as any) || 'MEDIUM', { timeout: (args.timeout as number) || undefined, pollInterval: (args.pollInterval as number) || undefined, stopOnStatus: (args.stopOnStatus as string[]) || undefined, } ); return { content: [ { type: 'text', text: JSON.stringify( { taskId: result.id, finalStatus: result.status, completedAt: result.completedAt, messagesCount: result.messages.length, task: result, }, null, 2 ), }, ], }; } case 'bytebot_intervene_in_task': { const result = await orchestrator.interveneInTask( args.taskId as string, args.message as string, (args.action as any) || 'resume', args.continueMonitoring !== false, (args.timeout as number) || 300000 ); return { content: [ { type: 'text', text: JSON.stringify( { taskId: result.id, status: result.status, intervention: 'applied', task: result, }, null, 2 ), }, ], }; } case 'bytebot_execute_workflow': { const result = await orchestrator.executeWorkflow( args.steps as WorkflowStep[], (args.priority as any) || 'MEDIUM', args.stopOnFailure !== false ); return { content: [ { type: 'text', text: JSON.stringify(result, null, 2), }, ], }; } case 'bytebot_monitor_task': { const result = await orchestrator.monitorTask(args.taskId as string, { timeout: (args.timeout as number) || undefined, pollInterval: (args.pollInterval as number) || undefined, stopOnStatus: (args.stopOnStatus as string[]) || undefined, }); return { content: [ { type: 'text', text: JSON.stringify( { taskId: result.id, finalStatus: result.status, completedAt: result.completedAt, task: result, }, null, 2 ), }, ], }; } default: throw new Error(`Unknown tool: ${toolName}`); } } catch (error) { const errorInfo = formatErrorForMCP(error); return { content: [ { type: 'text', text: `Error: ${errorInfo.error}${errorInfo.details ? '\n\nDetails:\n' + errorInfo.details : ''}`, }, ], isError: true, }; } }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/sensuslab/spark-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server