/**
* Hub Orchestrator Module
*
* Author: Yobie Benjamin
* Version: 0.2
* Date: July 28, 2025
*
* The HubOrchestrator is the brain of the Llama Maverick Hub.
* It coordinates between Llama AI and multiple MCP services,
* handling complex workflows and multi-service operations.
*/
import { EventEmitter } from 'eventemitter3';
import PQueue from 'p-queue';
import { v4 as uuidv4 } from 'uuid';
import winston from 'winston';
import { LlamaService } from '../services/llama-service.js';
import { ServiceRegistry } from '../registry/service-registry.js';
import { MCPClientManager } from '../clients/mcp-client-manager.js';
import type { Tool, Resource, Prompt } from '@modelcontextprotocol/sdk/types.js';
const logger = winston.createLogger({
level: 'debug',
format: winston.format.simple()
});
/**
* Workflow definition for multi-service operations
* Allows chaining of operations across different MCP services
*/
interface Workflow {
id: string;
name: string;
description: string;
steps: WorkflowStep[];
context: Map<string, any>;
}
/**
* Individual step in a workflow
* Can target any connected MCP service
*/
interface WorkflowStep {
id: string;
service: string;
tool: string;
arguments: Record<string, any>;
dependsOn?: string[];
retryPolicy?: {
maxRetries: number;
backoffMs: number;
};
}
/**
* Result from executing a workflow
* Aggregates results from all steps
*/
interface WorkflowResult {
workflowId: string;
success: boolean;
steps: Map<string, any>;
finalResult?: any;
errors?: string[];
}
/**
* Main orchestrator class that coordinates all hub operations
* Implements intelligent routing, workflow management, and AI-driven decision making
*/
export class HubOrchestrator extends EventEmitter {
private llamaService: LlamaService;
private registry: ServiceRegistry;
private clientManager: MCPClientManager;
private workflows: Map<string, Workflow>;
private executionQueue: PQueue;
private toolCache: Map<string, Tool>;
private resourceCache: Map<string, Resource>;
constructor(
llamaService: LlamaService,
registry: ServiceRegistry,
clientManager: MCPClientManager
) {
super();
/**
* Initialize core dependencies
* These components work together to enable multi-service orchestration
*/
this.llamaService = llamaService;
this.registry = registry;
this.clientManager = clientManager;
/**
* Initialize internal state management
* Caches and queues optimize performance and reliability
*/
this.workflows = new Map();
this.toolCache = new Map();
this.resourceCache = new Map();
/**
* Create execution queue for managing concurrent operations
* Prevents overwhelming connected services with too many requests
*/
this.executionQueue = new PQueue({
concurrency: 5,
interval: 1000,
intervalCap: 10
});
this.initializeBuiltInWorkflows();
}
/**
* Initialize pre-defined workflows for common multi-service operations
* These workflows demonstrate the power of orchestrating multiple MCP services
*/
private initializeBuiltInWorkflows(): void {
// Example: Customer onboarding workflow using Stripe + Database + Email
this.registerWorkflow({
id: 'customer-onboarding',
name: 'Customer Onboarding',
description: 'Complete customer onboarding with payment setup',
steps: [
{
id: 'create-customer',
service: 'stripe',
tool: 'stripe_create_customer',
arguments: {},
retryPolicy: { maxRetries: 3, backoffMs: 1000 }
},
{
id: 'setup-subscription',
service: 'stripe',
tool: 'stripe_create_subscription',
arguments: {},
dependsOn: ['create-customer']
},
{
id: 'store-in-db',
service: 'database',
tool: 'db_insert',
arguments: {},
dependsOn: ['create-customer', 'setup-subscription']
}
],
context: new Map()
});
logger.info('Initialized built-in workflows');
}
/**
* Discover and register tools from all connected MCP services
* This creates a unified tool registry across all services
*/
async discoverAndRegisterTools(): Promise<void> {
logger.info('Discovering tools from connected services...');
const services = this.registry.listServices();
for (const serviceId of services) {
try {
/**
* Query each service for its available tools
* Tools are namespaced by service to avoid conflicts
*/
const tools = await this.clientManager.listServiceTools(serviceId);
for (const tool of tools) {
// Namespace tool name with service ID
const namespacedName = `${serviceId}_${tool.name}`;
const namespacedTool: Tool = {
...tool,
name: namespacedName,
description: `[${serviceId}] ${tool.description}`
};
this.toolCache.set(namespacedName, namespacedTool);
logger.debug(`Registered tool: ${namespacedName}`);
}
} catch (error) {
logger.error(`Failed to discover tools from ${serviceId}:`, error);
}
}
/**
* Register hub-specific orchestration tools
* These tools provide workflow and multi-service capabilities
*/
this.registerHubTools();
}
/**
* Register special hub-level tools for orchestration
* These tools enable complex multi-service workflows
*/
private registerHubTools(): void {
// Tool for executing workflows
this.toolCache.set('hub_execute_workflow', {
name: 'hub_execute_workflow',
description: 'Execute a multi-service workflow',
inputSchema: {
type: 'object',
properties: {
workflowId: { type: 'string', description: 'ID of the workflow to execute' },
parameters: { type: 'object', description: 'Workflow parameters' }
},
required: ['workflowId']
}
});
// Tool for AI-driven service selection
this.toolCache.set('hub_smart_route', {
name: 'hub_smart_route',
description: 'Use Llama AI to intelligently route requests to the best service',
inputSchema: {
type: 'object',
properties: {
query: { type: 'string', description: 'Natural language request' },
context: { type: 'object', description: 'Additional context' }
},
required: ['query']
}
});
// Tool for parallel service queries
this.toolCache.set('hub_parallel_query', {
name: 'hub_parallel_query',
description: 'Query multiple services in parallel and aggregate results',
inputSchema: {
type: 'object',
properties: {
services: {
type: 'array',
items: { type: 'string' },
description: 'Services to query'
},
query: { type: 'string', description: 'Query to send to each service' }
},
required: ['services', 'query']
}
});
}
/**
* Get all available tools from all connected services
* Returns a unified tool list for Claude to use
*/
async getAggregatedTools(): Promise<Tool[]> {
return Array.from(this.toolCache.values());
}
/**
* Execute a tool, routing to the appropriate service
* Handles both service-specific tools and hub orchestration tools
*/
async executeTool(name: string, args: Record<string, any>): Promise<any> {
logger.info(`Executing tool: ${name}`, { args });
/**
* Handle hub-specific orchestration tools
* These tools coordinate multiple services
*/
if (name.startsWith('hub_')) {
return this.executeHubTool(name, args);
}
/**
* Route to appropriate MCP service
* Tool names are namespaced as "service_toolname"
*/
const [serviceId, ...toolParts] = name.split('_');
const actualToolName = toolParts.join('_');
if (!this.registry.hasService(serviceId)) {
throw new Error(`Service not found: ${serviceId}`);
}
try {
/**
* Execute tool through MCP client
* The client manager handles the actual MCP protocol communication
*/
const result = await this.clientManager.executeServiceTool(
serviceId,
actualToolName,
args
);
/**
* Emit event for monitoring and logging
* Other components can listen to these events
*/
this.emit('tool:executed', {
service: serviceId,
tool: actualToolName,
success: true
});
return result;
} catch (error) {
logger.error(`Tool execution failed: ${name}`, error);
this.emit('tool:failed', {
service: serviceId,
tool: actualToolName,
error
});
throw error;
}
}
/**
* Execute hub-specific orchestration tools
* These enable advanced multi-service capabilities
*/
private async executeHubTool(name: string, args: Record<string, any>): Promise<any> {
switch (name) {
case 'hub_execute_workflow':
return this.executeWorkflow(args.workflowId, args.parameters || {});
case 'hub_smart_route':
return this.smartRoute(args.query, args.context || {});
case 'hub_parallel_query':
return this.parallelQuery(args.services, args.query);
default:
throw new Error(`Unknown hub tool: ${name}`);
}
}
/**
* Execute a multi-step workflow across services
* Manages dependencies, retries, and context passing
*/
async executeWorkflow(workflowId: string, parameters: Record<string, any>): Promise<WorkflowResult> {
const workflow = this.workflows.get(workflowId);
if (!workflow) {
throw new Error(`Workflow not found: ${workflowId}`);
}
logger.info(`Executing workflow: ${workflowId}`);
const result: WorkflowResult = {
workflowId: workflow.id,
success: true,
steps: new Map(),
errors: []
};
/**
* Initialize workflow context with parameters
* Context is passed between steps for data sharing
*/
workflow.context.set('parameters', parameters);
/**
* Execute workflow steps respecting dependencies
* Uses topological sort to determine execution order
*/
const executionOrder = this.topologicalSort(workflow.steps);
for (const stepId of executionOrder) {
const step = workflow.steps.find(s => s.id === stepId)!;
try {
/**
* Check if dependencies are satisfied
* Skip if any dependency failed
*/
if (step.dependsOn) {
const depsFailed = step.dependsOn.some(
depId => !result.steps.has(depId)
);
if (depsFailed) {
result.errors?.push(`Step ${stepId} skipped due to failed dependencies`);
continue;
}
}
/**
* Prepare arguments with context substitution
* Allows passing data between workflow steps
*/
const preparedArgs = this.prepareStepArguments(
step.arguments,
workflow.context
);
/**
* Execute step with retry policy
* Handles transient failures gracefully
*/
const stepResult = await this.executeStepWithRetry(
step,
preparedArgs
);
result.steps.set(stepId, stepResult);
workflow.context.set(stepId, stepResult);
} catch (error) {
logger.error(`Workflow step failed: ${stepId}`, error);
result.success = false;
result.errors?.push(`Step ${stepId} failed: ${error}`);
// Continue with other steps that don't depend on this one
}
}
/**
* Prepare final result using Llama to synthesize
* AI summarizes the workflow execution
*/
if (result.success) {
result.finalResult = await this.synthesizeWorkflowResult(
workflow,
result.steps
);
}
return result;
}
/**
* Use Llama AI to intelligently route requests to the best service
* Analyzes the query and available services to make routing decisions
*/
private async smartRoute(query: string, context: Record<string, any>): Promise<any> {
logger.info('Smart routing query:', { query });
/**
* Ask Llama to analyze the query and determine best service
* Llama understands the capabilities of each connected service
*/
const analysis = await this.llamaService.complete(
`Analyze this request and determine which MCP service should handle it:
Query: ${query}
Available services: ${this.registry.listServices().join(', ')}
Context: ${JSON.stringify(context)}
Respond with JSON: { "service": "service_name", "tool": "tool_name", "arguments": {} }`
);
try {
const routing = JSON.parse(analysis);
/**
* Execute the tool on the selected service
* Llama's decision drives the routing
*/
return this.executeTool(
`${routing.service}_${routing.tool}`,
routing.arguments
);
} catch (error) {
logger.error('Smart routing failed:', error);
throw new Error('Failed to route request intelligently');
}
}
/**
* Query multiple services in parallel and aggregate results
* Useful for gathering information from multiple sources
*/
private async parallelQuery(services: string[], query: string): Promise<any> {
logger.info('Executing parallel query:', { services, query });
/**
* Create parallel execution promises
* Each service is queried simultaneously
*/
const queries = services.map(async (serviceId) => {
try {
// Find appropriate tool for the query on this service
const tools = await this.clientManager.listServiceTools(serviceId);
const queryTool = tools.find(t =>
t.name.includes('query') ||
t.name.includes('search') ||
t.name.includes('get')
);
if (!queryTool) {
return { service: serviceId, error: 'No query tool available' };
}
const result = await this.clientManager.executeServiceTool(
serviceId,
queryTool.name,
{ query }
);
return { service: serviceId, result };
} catch (error) {
return { service: serviceId, error: String(error) };
}
});
/**
* Wait for all queries to complete
* Results are collected regardless of individual failures
*/
const results = await Promise.allSettled(queries);
/**
* Use Llama to synthesize and summarize results
* AI provides intelligent aggregation of multi-source data
*/
const aggregated = await this.llamaService.complete(
`Synthesize these query results from multiple services:
${JSON.stringify(results, null, 2)}
Provide a comprehensive summary.`
);
return {
services: services,
query: query,
results: results,
synthesis: aggregated
};
}
/**
* Get aggregated resources from all services
* Provides unified resource access across services
*/
async getAggregatedResources(): Promise<Resource[]> {
const resources: Resource[] = [];
for (const serviceId of this.registry.listServices()) {
try {
const serviceResources = await this.clientManager.listServiceResources(serviceId);
// Namespace resources by service
resources.push(...serviceResources.map(r => ({
...r,
uri: `${serviceId}://${r.uri}`,
name: `[${serviceId}] ${r.name}`
})));
} catch (error) {
logger.error(`Failed to get resources from ${serviceId}:`, error);
}
}
return resources;
}
/**
* Read a resource from the appropriate service
* Routes based on the service namespace in the URI
*/
async readResource(uri: string): Promise<string> {
// Parse service from URI (format: "service://path")
const match = uri.match(/^([^:]+):\/\/(.+)$/);
if (!match) {
throw new Error(`Invalid resource URI: ${uri}`);
}
const [, serviceId, actualUri] = match;
return this.clientManager.readServiceResource(serviceId, actualUri);
}
/**
* Get aggregated prompts from all services
* Combines prompts from hub and all connected services
*/
async getAggregatedPrompts(): Promise<Prompt[]> {
const prompts: Prompt[] = [];
// Add hub-level prompts
prompts.push({
name: 'orchestrate',
description: 'Orchestrate a multi-service workflow',
arguments: [
{
name: 'goal',
description: 'What you want to accomplish',
required: true
}
]
});
// Gather prompts from services
for (const serviceId of this.registry.listServices()) {
try {
const servicePrompts = await this.clientManager.listServicePrompts(serviceId);
prompts.push(...servicePrompts.map(p => ({
...p,
name: `${serviceId}_${p.name}`,
description: `[${serviceId}] ${p.description}`
})));
} catch (error) {
logger.error(`Failed to get prompts from ${serviceId}:`, error);
}
}
return prompts;
}
/**
* Get a specific prompt
* Routes to the appropriate service or returns hub prompt
*/
async getPrompt(name: string, args?: Record<string, string>): Promise<any> {
if (name === 'orchestrate') {
return {
messages: [
{
role: 'user',
content: {
type: 'text',
text: args?.goal || 'Help me accomplish a complex task'
}
}
]
};
}
// Route to service
const [serviceId, ...promptParts] = name.split('_');
const actualPromptName = promptParts.join('_');
return this.clientManager.getServicePrompt(serviceId, actualPromptName, args);
}
/**
* Handle completion requests using Llama with multi-service context
* Enriches completions with data from connected services
*/
async complete(ref: any, argument: any): Promise<any> {
// Gather relevant context from services
const context = await this.gatherCompletionContext(ref);
// Use Llama with enriched context
return this.llamaService.completeWithContext(argument, context);
}
/**
* Register a new workflow for orchestration
* Workflows define multi-step, multi-service operations
*/
registerWorkflow(workflow: Workflow): void {
this.workflows.set(workflow.id, workflow);
logger.info(`Registered workflow: ${workflow.id}`);
}
/**
* Helper: Topological sort for workflow step ordering
* Ensures dependencies are executed before dependent steps
*/
private topologicalSort(steps: WorkflowStep[]): string[] {
const visited = new Set<string>();
const result: string[] = [];
const visit = (stepId: string) => {
if (visited.has(stepId)) return;
visited.add(stepId);
const step = steps.find(s => s.id === stepId);
if (step?.dependsOn) {
for (const dep of step.dependsOn) {
visit(dep);
}
}
result.push(stepId);
};
for (const step of steps) {
visit(step.id);
}
return result;
}
/**
* Helper: Prepare step arguments with context substitution
* Allows passing data between workflow steps
*/
private prepareStepArguments(
args: Record<string, any>,
context: Map<string, any>
): Record<string, any> {
const prepared: Record<string, any> = {};
for (const [key, value] of Object.entries(args)) {
if (typeof value === 'string' && value.startsWith('{{') && value.endsWith('}}')) {
// Context reference: {{stepId.field}}
const ref = value.slice(2, -2);
const [contextKey, ...path] = ref.split('.');
let contextValue = context.get(contextKey);
for (const p of path) {
contextValue = contextValue?.[p];
}
prepared[key] = contextValue;
} else {
prepared[key] = value;
}
}
return prepared;
}
/**
* Helper: Execute step with retry policy
* Handles transient failures with exponential backoff
*/
private async executeStepWithRetry(
step: WorkflowStep,
args: Record<string, any>
): Promise<any> {
const retryPolicy = step.retryPolicy || { maxRetries: 1, backoffMs: 0 };
let lastError: Error | undefined;
for (let attempt = 0; attempt <= retryPolicy.maxRetries; attempt++) {
try {
if (attempt > 0) {
// Exponential backoff
await new Promise(resolve =>
setTimeout(resolve, retryPolicy.backoffMs * Math.pow(2, attempt - 1))
);
}
return await this.executeTool(
`${step.service}_${step.tool}`,
args
);
} catch (error) {
lastError = error as Error;
logger.warn(`Step ${step.id} attempt ${attempt + 1} failed:`, error);
}
}
throw lastError;
}
/**
* Helper: Use Llama to synthesize workflow results
* Creates intelligent summaries of multi-step operations
*/
private async synthesizeWorkflowResult(
workflow: Workflow,
steps: Map<string, any>
): Promise<any> {
const prompt = `Synthesize the results of this workflow execution:
Workflow: ${workflow.name}
Description: ${workflow.description}
Step Results:
${Array.from(steps.entries())
.map(([id, result]) => `${id}: ${JSON.stringify(result, null, 2)}`)
.join('\n')}
Provide a comprehensive summary of what was accomplished.`;
return this.llamaService.complete(prompt);
}
/**
* Helper: Gather context from services for completions
* Enriches AI responses with real-time service data
*/
private async gatherCompletionContext(ref: any): Promise<Record<string, any>> {
const context: Record<string, any> = {};
// Gather relevant data from each service
for (const serviceId of this.registry.listServices()) {
try {
// Service-specific context gathering logic
context[serviceId] = await this.clientManager.getServiceContext(serviceId);
} catch (error) {
logger.warn(`Failed to gather context from ${serviceId}:`, error);
}
}
return context;
}
}