Skip to main content
Glama
YobieBen
by YobieBen
hub-orchestrator.ts22.6 kB
/** * Hub Orchestrator Module * * Author: Yobie Benjamin * Version: 0.2 * Date: July 28, 2025 * * The HubOrchestrator is the brain of the Llama Maverick Hub. * It coordinates between Llama AI and multiple MCP services, * handling complex workflows and multi-service operations. */ import { EventEmitter } from 'eventemitter3'; import PQueue from 'p-queue'; import { v4 as uuidv4 } from 'uuid'; import winston from 'winston'; import { LlamaService } from '../services/llama-service.js'; import { ServiceRegistry } from '../registry/service-registry.js'; import { MCPClientManager } from '../clients/mcp-client-manager.js'; import type { Tool, Resource, Prompt } from '@modelcontextprotocol/sdk/types.js'; const logger = winston.createLogger({ level: 'debug', format: winston.format.simple() }); /** * Workflow definition for multi-service operations * Allows chaining of operations across different MCP services */ interface Workflow { id: string; name: string; description: string; steps: WorkflowStep[]; context: Map<string, any>; } /** * Individual step in a workflow * Can target any connected MCP service */ interface WorkflowStep { id: string; service: string; tool: string; arguments: Record<string, any>; dependsOn?: string[]; retryPolicy?: { maxRetries: number; backoffMs: number; }; } /** * Result from executing a workflow * Aggregates results from all steps */ interface WorkflowResult { workflowId: string; success: boolean; steps: Map<string, any>; finalResult?: any; errors?: string[]; } /** * Main orchestrator class that coordinates all hub operations * Implements intelligent routing, workflow management, and AI-driven decision making */ export class HubOrchestrator extends EventEmitter { private llamaService: LlamaService; private registry: ServiceRegistry; private clientManager: MCPClientManager; private workflows: Map<string, Workflow>; private executionQueue: PQueue; private toolCache: Map<string, Tool>; private resourceCache: Map<string, Resource>; constructor( llamaService: LlamaService, registry: ServiceRegistry, clientManager: MCPClientManager ) { super(); /** * Initialize core dependencies * These components work together to enable multi-service orchestration */ this.llamaService = llamaService; this.registry = registry; this.clientManager = clientManager; /** * Initialize internal state management * Caches and queues optimize performance and reliability */ this.workflows = new Map(); this.toolCache = new Map(); this.resourceCache = new Map(); /** * Create execution queue for managing concurrent operations * Prevents overwhelming connected services with too many requests */ this.executionQueue = new PQueue({ concurrency: 5, interval: 1000, intervalCap: 10 }); this.initializeBuiltInWorkflows(); } /** * Initialize pre-defined workflows for common multi-service operations * These workflows demonstrate the power of orchestrating multiple MCP services */ private initializeBuiltInWorkflows(): void { // Example: Customer onboarding workflow using Stripe + Database + Email this.registerWorkflow({ id: 'customer-onboarding', name: 'Customer Onboarding', description: 'Complete customer onboarding with payment setup', steps: [ { id: 'create-customer', service: 'stripe', tool: 'stripe_create_customer', arguments: {}, retryPolicy: { maxRetries: 3, backoffMs: 1000 } }, { id: 'setup-subscription', service: 'stripe', tool: 'stripe_create_subscription', arguments: {}, dependsOn: ['create-customer'] }, { id: 'store-in-db', service: 'database', tool: 'db_insert', arguments: {}, dependsOn: ['create-customer', 'setup-subscription'] } ], context: new Map() }); logger.info('Initialized built-in workflows'); } /** * Discover and register tools from all connected MCP services * This creates a unified tool registry across all services */ async discoverAndRegisterTools(): Promise<void> { logger.info('Discovering tools from connected services...'); const services = this.registry.listServices(); for (const serviceId of services) { try { /** * Query each service for its available tools * Tools are namespaced by service to avoid conflicts */ const tools = await this.clientManager.listServiceTools(serviceId); for (const tool of tools) { // Namespace tool name with service ID const namespacedName = `${serviceId}_${tool.name}`; const namespacedTool: Tool = { ...tool, name: namespacedName, description: `[${serviceId}] ${tool.description}` }; this.toolCache.set(namespacedName, namespacedTool); logger.debug(`Registered tool: ${namespacedName}`); } } catch (error) { logger.error(`Failed to discover tools from ${serviceId}:`, error); } } /** * Register hub-specific orchestration tools * These tools provide workflow and multi-service capabilities */ this.registerHubTools(); } /** * Register special hub-level tools for orchestration * These tools enable complex multi-service workflows */ private registerHubTools(): void { // Tool for executing workflows this.toolCache.set('hub_execute_workflow', { name: 'hub_execute_workflow', description: 'Execute a multi-service workflow', inputSchema: { type: 'object', properties: { workflowId: { type: 'string', description: 'ID of the workflow to execute' }, parameters: { type: 'object', description: 'Workflow parameters' } }, required: ['workflowId'] } }); // Tool for AI-driven service selection this.toolCache.set('hub_smart_route', { name: 'hub_smart_route', description: 'Use Llama AI to intelligently route requests to the best service', inputSchema: { type: 'object', properties: { query: { type: 'string', description: 'Natural language request' }, context: { type: 'object', description: 'Additional context' } }, required: ['query'] } }); // Tool for parallel service queries this.toolCache.set('hub_parallel_query', { name: 'hub_parallel_query', description: 'Query multiple services in parallel and aggregate results', inputSchema: { type: 'object', properties: { services: { type: 'array', items: { type: 'string' }, description: 'Services to query' }, query: { type: 'string', description: 'Query to send to each service' } }, required: ['services', 'query'] } }); } /** * Get all available tools from all connected services * Returns a unified tool list for Claude to use */ async getAggregatedTools(): Promise<Tool[]> { return Array.from(this.toolCache.values()); } /** * Execute a tool, routing to the appropriate service * Handles both service-specific tools and hub orchestration tools */ async executeTool(name: string, args: Record<string, any>): Promise<any> { logger.info(`Executing tool: ${name}`, { args }); /** * Handle hub-specific orchestration tools * These tools coordinate multiple services */ if (name.startsWith('hub_')) { return this.executeHubTool(name, args); } /** * Route to appropriate MCP service * Tool names are namespaced as "service_toolname" */ const [serviceId, ...toolParts] = name.split('_'); const actualToolName = toolParts.join('_'); if (!this.registry.hasService(serviceId)) { throw new Error(`Service not found: ${serviceId}`); } try { /** * Execute tool through MCP client * The client manager handles the actual MCP protocol communication */ const result = await this.clientManager.executeServiceTool( serviceId, actualToolName, args ); /** * Emit event for monitoring and logging * Other components can listen to these events */ this.emit('tool:executed', { service: serviceId, tool: actualToolName, success: true }); return result; } catch (error) { logger.error(`Tool execution failed: ${name}`, error); this.emit('tool:failed', { service: serviceId, tool: actualToolName, error }); throw error; } } /** * Execute hub-specific orchestration tools * These enable advanced multi-service capabilities */ private async executeHubTool(name: string, args: Record<string, any>): Promise<any> { switch (name) { case 'hub_execute_workflow': return this.executeWorkflow(args.workflowId, args.parameters || {}); case 'hub_smart_route': return this.smartRoute(args.query, args.context || {}); case 'hub_parallel_query': return this.parallelQuery(args.services, args.query); default: throw new Error(`Unknown hub tool: ${name}`); } } /** * Execute a multi-step workflow across services * Manages dependencies, retries, and context passing */ async executeWorkflow(workflowId: string, parameters: Record<string, any>): Promise<WorkflowResult> { const workflow = this.workflows.get(workflowId); if (!workflow) { throw new Error(`Workflow not found: ${workflowId}`); } logger.info(`Executing workflow: ${workflowId}`); const result: WorkflowResult = { workflowId: workflow.id, success: true, steps: new Map(), errors: [] }; /** * Initialize workflow context with parameters * Context is passed between steps for data sharing */ workflow.context.set('parameters', parameters); /** * Execute workflow steps respecting dependencies * Uses topological sort to determine execution order */ const executionOrder = this.topologicalSort(workflow.steps); for (const stepId of executionOrder) { const step = workflow.steps.find(s => s.id === stepId)!; try { /** * Check if dependencies are satisfied * Skip if any dependency failed */ if (step.dependsOn) { const depsFailed = step.dependsOn.some( depId => !result.steps.has(depId) ); if (depsFailed) { result.errors?.push(`Step ${stepId} skipped due to failed dependencies`); continue; } } /** * Prepare arguments with context substitution * Allows passing data between workflow steps */ const preparedArgs = this.prepareStepArguments( step.arguments, workflow.context ); /** * Execute step with retry policy * Handles transient failures gracefully */ const stepResult = await this.executeStepWithRetry( step, preparedArgs ); result.steps.set(stepId, stepResult); workflow.context.set(stepId, stepResult); } catch (error) { logger.error(`Workflow step failed: ${stepId}`, error); result.success = false; result.errors?.push(`Step ${stepId} failed: ${error}`); // Continue with other steps that don't depend on this one } } /** * Prepare final result using Llama to synthesize * AI summarizes the workflow execution */ if (result.success) { result.finalResult = await this.synthesizeWorkflowResult( workflow, result.steps ); } return result; } /** * Use Llama AI to intelligently route requests to the best service * Analyzes the query and available services to make routing decisions */ private async smartRoute(query: string, context: Record<string, any>): Promise<any> { logger.info('Smart routing query:', { query }); /** * Ask Llama to analyze the query and determine best service * Llama understands the capabilities of each connected service */ const analysis = await this.llamaService.complete( `Analyze this request and determine which MCP service should handle it: Query: ${query} Available services: ${this.registry.listServices().join(', ')} Context: ${JSON.stringify(context)} Respond with JSON: { "service": "service_name", "tool": "tool_name", "arguments": {} }` ); try { const routing = JSON.parse(analysis); /** * Execute the tool on the selected service * Llama's decision drives the routing */ return this.executeTool( `${routing.service}_${routing.tool}`, routing.arguments ); } catch (error) { logger.error('Smart routing failed:', error); throw new Error('Failed to route request intelligently'); } } /** * Query multiple services in parallel and aggregate results * Useful for gathering information from multiple sources */ private async parallelQuery(services: string[], query: string): Promise<any> { logger.info('Executing parallel query:', { services, query }); /** * Create parallel execution promises * Each service is queried simultaneously */ const queries = services.map(async (serviceId) => { try { // Find appropriate tool for the query on this service const tools = await this.clientManager.listServiceTools(serviceId); const queryTool = tools.find(t => t.name.includes('query') || t.name.includes('search') || t.name.includes('get') ); if (!queryTool) { return { service: serviceId, error: 'No query tool available' }; } const result = await this.clientManager.executeServiceTool( serviceId, queryTool.name, { query } ); return { service: serviceId, result }; } catch (error) { return { service: serviceId, error: String(error) }; } }); /** * Wait for all queries to complete * Results are collected regardless of individual failures */ const results = await Promise.allSettled(queries); /** * Use Llama to synthesize and summarize results * AI provides intelligent aggregation of multi-source data */ const aggregated = await this.llamaService.complete( `Synthesize these query results from multiple services: ${JSON.stringify(results, null, 2)} Provide a comprehensive summary.` ); return { services: services, query: query, results: results, synthesis: aggregated }; } /** * Get aggregated resources from all services * Provides unified resource access across services */ async getAggregatedResources(): Promise<Resource[]> { const resources: Resource[] = []; for (const serviceId of this.registry.listServices()) { try { const serviceResources = await this.clientManager.listServiceResources(serviceId); // Namespace resources by service resources.push(...serviceResources.map(r => ({ ...r, uri: `${serviceId}://${r.uri}`, name: `[${serviceId}] ${r.name}` }))); } catch (error) { logger.error(`Failed to get resources from ${serviceId}:`, error); } } return resources; } /** * Read a resource from the appropriate service * Routes based on the service namespace in the URI */ async readResource(uri: string): Promise<string> { // Parse service from URI (format: "service://path") const match = uri.match(/^([^:]+):\/\/(.+)$/); if (!match) { throw new Error(`Invalid resource URI: ${uri}`); } const [, serviceId, actualUri] = match; return this.clientManager.readServiceResource(serviceId, actualUri); } /** * Get aggregated prompts from all services * Combines prompts from hub and all connected services */ async getAggregatedPrompts(): Promise<Prompt[]> { const prompts: Prompt[] = []; // Add hub-level prompts prompts.push({ name: 'orchestrate', description: 'Orchestrate a multi-service workflow', arguments: [ { name: 'goal', description: 'What you want to accomplish', required: true } ] }); // Gather prompts from services for (const serviceId of this.registry.listServices()) { try { const servicePrompts = await this.clientManager.listServicePrompts(serviceId); prompts.push(...servicePrompts.map(p => ({ ...p, name: `${serviceId}_${p.name}`, description: `[${serviceId}] ${p.description}` }))); } catch (error) { logger.error(`Failed to get prompts from ${serviceId}:`, error); } } return prompts; } /** * Get a specific prompt * Routes to the appropriate service or returns hub prompt */ async getPrompt(name: string, args?: Record<string, string>): Promise<any> { if (name === 'orchestrate') { return { messages: [ { role: 'user', content: { type: 'text', text: args?.goal || 'Help me accomplish a complex task' } } ] }; } // Route to service const [serviceId, ...promptParts] = name.split('_'); const actualPromptName = promptParts.join('_'); return this.clientManager.getServicePrompt(serviceId, actualPromptName, args); } /** * Handle completion requests using Llama with multi-service context * Enriches completions with data from connected services */ async complete(ref: any, argument: any): Promise<any> { // Gather relevant context from services const context = await this.gatherCompletionContext(ref); // Use Llama with enriched context return this.llamaService.completeWithContext(argument, context); } /** * Register a new workflow for orchestration * Workflows define multi-step, multi-service operations */ registerWorkflow(workflow: Workflow): void { this.workflows.set(workflow.id, workflow); logger.info(`Registered workflow: ${workflow.id}`); } /** * Helper: Topological sort for workflow step ordering * Ensures dependencies are executed before dependent steps */ private topologicalSort(steps: WorkflowStep[]): string[] { const visited = new Set<string>(); const result: string[] = []; const visit = (stepId: string) => { if (visited.has(stepId)) return; visited.add(stepId); const step = steps.find(s => s.id === stepId); if (step?.dependsOn) { for (const dep of step.dependsOn) { visit(dep); } } result.push(stepId); }; for (const step of steps) { visit(step.id); } return result; } /** * Helper: Prepare step arguments with context substitution * Allows passing data between workflow steps */ private prepareStepArguments( args: Record<string, any>, context: Map<string, any> ): Record<string, any> { const prepared: Record<string, any> = {}; for (const [key, value] of Object.entries(args)) { if (typeof value === 'string' && value.startsWith('{{') && value.endsWith('}}')) { // Context reference: {{stepId.field}} const ref = value.slice(2, -2); const [contextKey, ...path] = ref.split('.'); let contextValue = context.get(contextKey); for (const p of path) { contextValue = contextValue?.[p]; } prepared[key] = contextValue; } else { prepared[key] = value; } } return prepared; } /** * Helper: Execute step with retry policy * Handles transient failures with exponential backoff */ private async executeStepWithRetry( step: WorkflowStep, args: Record<string, any> ): Promise<any> { const retryPolicy = step.retryPolicy || { maxRetries: 1, backoffMs: 0 }; let lastError: Error | undefined; for (let attempt = 0; attempt <= retryPolicy.maxRetries; attempt++) { try { if (attempt > 0) { // Exponential backoff await new Promise(resolve => setTimeout(resolve, retryPolicy.backoffMs * Math.pow(2, attempt - 1)) ); } return await this.executeTool( `${step.service}_${step.tool}`, args ); } catch (error) { lastError = error as Error; logger.warn(`Step ${step.id} attempt ${attempt + 1} failed:`, error); } } throw lastError; } /** * Helper: Use Llama to synthesize workflow results * Creates intelligent summaries of multi-step operations */ private async synthesizeWorkflowResult( workflow: Workflow, steps: Map<string, any> ): Promise<any> { const prompt = `Synthesize the results of this workflow execution: Workflow: ${workflow.name} Description: ${workflow.description} Step Results: ${Array.from(steps.entries()) .map(([id, result]) => `${id}: ${JSON.stringify(result, null, 2)}`) .join('\n')} Provide a comprehensive summary of what was accomplished.`; return this.llamaService.complete(prompt); } /** * Helper: Gather context from services for completions * Enriches AI responses with real-time service data */ private async gatherCompletionContext(ref: any): Promise<Record<string, any>> { const context: Record<string, any> = {}; // Gather relevant data from each service for (const serviceId of this.registry.listServices()) { try { // Service-specific context gathering logic context[serviceId] = await this.clientManager.getServiceContext(serviceId); } catch (error) { logger.warn(`Failed to gather context from ${serviceId}:`, error); } } return context; } }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/YobieBen/llama-maverick-hub-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server