Skip to main content
Glama
agent-integration-bridge.ts25.5 kB
/** * Agent Integration Bridge * * Provides unified integration between Agent Registry and Agent Orchestrator * Implements synchronization, data model conversion, and unified registration */ import { AgentInfo, AgentCapability } from './agent-orchestrator.js'; import { AppError } from '../../../utils/errors.js'; import { dependencyContainer } from '../../../services/dependency-container.js'; import logger from '../../../logger.js'; // Import AgentRegistration type without importing the module export interface AgentRegistration { agentId: string; capabilities: string[]; transportType: 'stdio' | 'sse' | 'websocket' | 'http'; sessionId: string; maxConcurrentTasks: number; pollingInterval?: number; status?: 'online' | 'offline' | 'busy'; registeredAt?: number; lastSeen?: number; currentTasks?: string[]; websocketConnection?: WebSocket; httpEndpoint?: string; httpAuthToken?: string; } /** * Unified agent interface that bridges registry and orchestrator models */ export interface UnifiedAgent { // Core identification id: string; name?: string; // Capabilities (unified format) capabilities: string[]; // Status (unified format) status: 'online' | 'offline' | 'busy' | 'available' | 'error'; // Task management maxConcurrentTasks: number; currentTasks: string[]; // Communication transportType?: 'stdio' | 'sse' | 'websocket' | 'http'; sessionId?: string; pollingInterval?: number; // Timing registeredAt: number; lastSeen: number; lastHeartbeat: Date; // Performance performance: { tasksCompleted: number; averageCompletionTime: number; successRate: number; lastTaskCompletedAt?: Date; }; // Transport-specific httpEndpoint?: string; httpAuthToken?: string; websocketConnection?: WebSocket; // Metadata metadata: { version: string; supportedProtocols: string[]; preferences: Record<string, unknown>; }; } /** * Agent Integration Bridge Service * Manages synchronization between Agent Registry and Agent Orchestrator */ export class AgentIntegrationBridge { private static instance: AgentIntegrationBridge; private static isInitializing = false; // Initialization guard to prevent circular initialization // eslint-disable-next-line @typescript-eslint/no-explicit-any private agentRegistry: any | null = null; // Dynamic import from dependency container // eslint-disable-next-line @typescript-eslint/no-explicit-any private agentOrchestrator: any | null = null; // Dynamic import from dependency container private syncEnabled = true; private syncInterval?: NodeJS.Timeout; private registrationInProgress = new Set<string>(); // Prevent circular registration private constructor() { this.initializeDependencies(); } static getInstance(): AgentIntegrationBridge { if (AgentIntegrationBridge.isInitializing) { logger.warn('Circular initialization detected in AgentIntegrationBridge, using safe fallback'); return AgentIntegrationBridge.createSafeFallback(); } if (!AgentIntegrationBridge.instance) { AgentIntegrationBridge.isInitializing = true; try { AgentIntegrationBridge.instance = new AgentIntegrationBridge(); } finally { AgentIntegrationBridge.isInitializing = false; } } return AgentIntegrationBridge.instance; } /** * Create safe fallback instance to prevent recursion */ private static createSafeFallback(): AgentIntegrationBridge { const fallback = Object.create(AgentIntegrationBridge.prototype); // Initialize with minimal safe properties fallback.agentRegistry = null; fallback.agentOrchestrator = null; fallback.syncEnabled = false; fallback.registrationInProgress = new Set(); // Provide safe no-op methods fallback.registerAgent = async () => { logger.warn('AgentIntegrationBridge fallback: registerAgent called during initialization'); }; fallback.synchronizeAgents = async () => { logger.warn('AgentIntegrationBridge fallback: synchronizeAgents called during initialization'); }; fallback.getUnifiedAgent = async () => { logger.warn('AgentIntegrationBridge fallback: getUnifiedAgent called during initialization'); return null; }; fallback.startAutoSync = () => { logger.warn('AgentIntegrationBridge fallback: startAutoSync called during initialization'); }; return fallback; } /** * Initialize dependencies using dependency container */ private async initializeDependencies(): Promise<void> { try { // Initialize agent registry using dependency container this.agentRegistry = await dependencyContainer.getAgentRegistry(); if (!this.agentRegistry) { logger.warn('AgentRegistry not available'); } // Import and initialize agent orchestrator const { AgentOrchestrator } = await import('./agent-orchestrator.js'); this.agentOrchestrator = AgentOrchestrator.getInstance(); logger.info('Agent integration bridge dependencies initialized'); } catch (error) { logger.error({ err: error }, 'Failed to initialize agent integration bridge dependencies'); throw new AppError('Agent integration bridge initialization failed', { cause: error }); } } /** * Convert Agent Registry format to Orchestrator format */ convertRegistryToOrchestrator(registryAgent: AgentRegistration): Omit<AgentInfo, 'lastHeartbeat' | 'performance'> { // Map capabilities to orchestrator format const orchestratorCapabilities = this.mapCapabilities(registryAgent.capabilities); // Map status const orchestratorStatus = this.mapRegistryStatusToOrchestrator(registryAgent.status || 'online'); return { id: registryAgent.agentId, name: registryAgent.agentId, // Use agentId as name if no name provided capabilities: orchestratorCapabilities, maxConcurrentTasks: registryAgent.maxConcurrentTasks, currentTasks: registryAgent.currentTasks || [], status: orchestratorStatus, metadata: { version: '1.0.0', supportedProtocols: [registryAgent.transportType], preferences: { transportType: registryAgent.transportType, sessionId: registryAgent.sessionId, pollingInterval: registryAgent.pollingInterval, httpEndpoint: registryAgent.httpEndpoint, httpAuthToken: registryAgent.httpAuthToken } } }; } /** * Convert Orchestrator format to Registry format */ convertOrchestratorToRegistry(orchestratorAgent: AgentInfo): AgentRegistration { // Extract transport info from metadata const transportType = orchestratorAgent.metadata.preferences?.transportType || 'stdio'; const sessionId = orchestratorAgent.metadata.preferences?.sessionId || `session-${orchestratorAgent.id}`; // Map status const registryStatus = this.mapOrchestratorStatusToRegistry(orchestratorAgent.status); return { agentId: orchestratorAgent.id, capabilities: orchestratorAgent.capabilities.map(cap => cap.toString()), transportType: transportType as 'stdio' | 'sse' | 'websocket' | 'http', sessionId: sessionId as string, maxConcurrentTasks: orchestratorAgent.maxConcurrentTasks, pollingInterval: (orchestratorAgent.metadata?.preferences?.pollingInterval as number) || 5000, status: registryStatus, registeredAt: Date.now(), lastSeen: orchestratorAgent.lastHeartbeat.getTime(), currentTasks: orchestratorAgent.currentTasks, httpEndpoint: orchestratorAgent.metadata?.preferences?.httpEndpoint as string | undefined, httpAuthToken: orchestratorAgent.metadata?.preferences?.httpAuthToken as string | undefined }; } /** * Map capabilities between formats */ private mapCapabilities(capabilities: string[]): AgentCapability[] { const capabilityMap: Record<string, AgentCapability> = { 'code_generation': 'general', 'frontend': 'frontend', 'backend': 'backend', 'database': 'database', 'testing': 'testing', 'devops': 'devops', 'deployment': 'devops', 'documentation': 'documentation', 'refactoring': 'refactoring', 'debugging': 'debugging', 'review': 'general', 'research': 'general', 'optimization': 'general', 'analysis': 'general' }; return capabilities.map(cap => capabilityMap[cap] || 'general'); } /** * Map registry status to orchestrator status */ private mapRegistryStatusToOrchestrator(registryStatus: string): AgentInfo['status'] { const statusMap: Record<string, AgentInfo['status']> = { 'online': 'available', 'offline': 'offline', 'busy': 'busy' }; return statusMap[registryStatus] || 'available'; } /** * Map orchestrator status to registry status */ private mapOrchestratorStatusToRegistry(orchestratorStatus: AgentInfo['status']): AgentRegistration['status'] { const statusMap: Record<AgentInfo['status'], AgentRegistration['status']> = { 'available': 'online', 'busy': 'busy', 'offline': 'offline', 'error': 'offline' }; return statusMap[orchestratorStatus] || 'online'; } /** * Unified agent registration that updates both systems */ async registerAgent(agentData: Partial<UnifiedAgent> & { id: string; capabilities: string[] }): Promise<void> { // Prevent circular registration if (this.registrationInProgress.has(agentData.id)) { logger.debug({ agentId: agentData.id }, 'Agent registration already in progress, skipping to prevent circular registration'); return; } this.registrationInProgress.add(agentData.id); try { // Ensure dependencies are initialized if (!this.agentRegistry || !this.agentOrchestrator) { await this.initializeDependencies(); } // Create unified agent data with defaults const unifiedAgent: UnifiedAgent = { id: agentData.id, name: agentData.name || agentData.id, capabilities: agentData.capabilities, status: agentData.status || 'online', maxConcurrentTasks: agentData.maxConcurrentTasks || 1, currentTasks: agentData.currentTasks || [], transportType: agentData.transportType || 'stdio', sessionId: agentData.sessionId || `session-${agentData.id}`, pollingInterval: agentData.pollingInterval || 5000, registeredAt: agentData.registeredAt || Date.now(), lastSeen: agentData.lastSeen || Date.now(), lastHeartbeat: agentData.lastHeartbeat || new Date(), performance: agentData.performance || { tasksCompleted: 0, averageCompletionTime: 0, successRate: 1.0 }, httpEndpoint: agentData.httpEndpoint, httpAuthToken: agentData.httpAuthToken, websocketConnection: agentData.websocketConnection, metadata: agentData.metadata || { version: '1.0.0', supportedProtocols: [agentData.transportType || 'stdio'], preferences: {} } }; // Register in agent registry (without triggering bridge) const registryData = this.convertUnifiedToRegistry(unifiedAgent); await this.registerInRegistryOnly(registryData); // Register in agent orchestrator (without triggering bridge) const orchestratorData = this.convertUnifiedToOrchestrator(unifiedAgent); await this.registerInOrchestratorOnly(orchestratorData); logger.info({ agentId: agentData.id }, 'Agent registered in both registry and orchestrator via integration bridge'); } catch (error) { logger.error({ err: error, agentId: agentData.id }, 'Failed to register agent in unified system'); throw new AppError('Unified agent registration failed', { cause: error }); } finally { // Always remove from in-progress set this.registrationInProgress.delete(agentData.id); } } /** * Convert unified format to registry format */ private convertUnifiedToRegistry(unifiedAgent: UnifiedAgent): AgentRegistration { return { agentId: unifiedAgent.id, capabilities: unifiedAgent.capabilities, transportType: unifiedAgent.transportType!, sessionId: unifiedAgent.sessionId!, maxConcurrentTasks: unifiedAgent.maxConcurrentTasks, pollingInterval: unifiedAgent.pollingInterval, status: unifiedAgent.status === 'available' ? 'online' : unifiedAgent.status === 'error' ? 'offline' : (unifiedAgent.status as 'online' | 'offline' | 'busy'), registeredAt: unifiedAgent.registeredAt, lastSeen: unifiedAgent.lastSeen, currentTasks: unifiedAgent.currentTasks, httpEndpoint: unifiedAgent.httpEndpoint, httpAuthToken: unifiedAgent.httpAuthToken, websocketConnection: unifiedAgent.websocketConnection }; } /** * Convert unified format to orchestrator format */ private convertUnifiedToOrchestrator(unifiedAgent: UnifiedAgent): Omit<AgentInfo, 'lastHeartbeat' | 'performance'> { return { id: unifiedAgent.id, name: unifiedAgent.name!, capabilities: this.mapCapabilities(unifiedAgent.capabilities), maxConcurrentTasks: unifiedAgent.maxConcurrentTasks, currentTasks: unifiedAgent.currentTasks, status: unifiedAgent.status === 'online' ? 'available' : (unifiedAgent.status as 'available' | 'busy' | 'offline' | 'error'), metadata: unifiedAgent.metadata }; } /** * Register agent in registry only (without triggering bridge) */ private async registerInRegistryOnly(registryData: AgentRegistration): Promise<void> { if (!this.agentRegistry) { throw new AppError('Agent registry not initialized'); } // Create a direct registration method that bypasses bridge const directRegister = async (data: AgentRegistration) => { // Call the original registry logic without bridge integration this.agentRegistry!.validateRegistration(data); const existingAgent = this.agentRegistry.agents?.get(data.agentId); if (existingAgent) { await this.agentRegistry.updateAgent(data); } else { await this.agentRegistry.createAgent(data); } this.agentRegistry.sessionToAgent?.set(data.sessionId, data.agentId); }; await directRegister(registryData); } /** * Register agent in orchestrator only (without triggering bridge) */ private async registerInOrchestratorOnly(orchestratorData: Omit<AgentInfo, 'lastHeartbeat' | 'performance'>): Promise<void> { if (!this.agentOrchestrator) { throw new AppError('Agent orchestrator not initialized'); } // Direct registration in orchestrator without triggering bridge const fullAgentInfo = { ...orchestratorData, lastHeartbeat: new Date(), performance: { tasksCompleted: 0, averageCompletionTime: 0, successRate: 1.0 } }; this.agentOrchestrator.agents?.set(orchestratorData.id, fullAgentInfo); } /** * Synchronize agents between registry and orchestrator */ async synchronizeAgents(): Promise<void> { if (!this.syncEnabled) return; try { // Ensure dependencies are initialized if (!this.agentRegistry || !this.agentOrchestrator) { await this.initializeDependencies(); } // Get agents from both systems const registryAgents = await this.agentRegistry.getAllAgents(); const orchestratorAgents = await this.agentOrchestrator.getAgents(); // Create maps for efficient lookup const registryMap = new Map(registryAgents.map((agent: AgentRegistration) => [agent.agentId, agent])); const orchestratorMap = new Map(orchestratorAgents.map((agent: AgentInfo) => [agent.id, agent])); // Sync registry agents to orchestrator for (const registryAgent of registryAgents) { if (!orchestratorMap.has(registryAgent.agentId)) { const orchestratorData = this.convertRegistryToOrchestrator(registryAgent); await this.agentOrchestrator.registerAgent(orchestratorData); logger.debug({ agentId: registryAgent.agentId }, 'Synced registry agent to orchestrator'); } } // Sync orchestrator agents to registry for (const orchestratorAgent of orchestratorAgents) { if (!registryMap.has(orchestratorAgent.id)) { const registryData = this.convertOrchestratorToRegistry(orchestratorAgent); await this.agentRegistry.registerAgent(registryData); logger.debug({ agentId: orchestratorAgent.id }, 'Synced orchestrator agent to registry'); } } logger.debug('Agent synchronization completed'); } catch (error) { logger.error({ err: error }, 'Failed to synchronize agents'); } } /** * Start automatic synchronization */ startAutoSync(intervalMs: number = 30000): void { if (this.syncInterval) { clearInterval(this.syncInterval); } this.syncInterval = setInterval(() => { this.synchronizeAgents().catch(error => { logger.error({ err: error }, 'Auto-sync failed'); }); }, intervalMs); logger.info({ intervalMs }, 'Agent auto-synchronization started'); } /** * Stop automatic synchronization */ stopAutoSync(): void { if (this.syncInterval) { clearInterval(this.syncInterval); this.syncInterval = undefined; logger.info('Agent auto-synchronization stopped'); } } /** * Immediately propagate agent status change to all systems */ async propagateStatusChange( agentId: string, newStatus: 'online' | 'offline' | 'busy' | 'available' | 'error', source: 'registry' | 'orchestrator' ): Promise<void> { try { // Ensure dependencies are initialized if (!this.agentRegistry || !this.agentOrchestrator) { await this.initializeDependencies(); } logger.debug({ agentId, newStatus, source }, 'Propagating status change'); if (source === 'orchestrator') { // Update registry status const registryStatus = this.mapOrchestratorStatusToRegistry(newStatus as AgentInfo['status']); await this.agentRegistry.updateAgentStatus(agentId, registryStatus); logger.debug({ agentId, registryStatus }, 'Status propagated from orchestrator to registry'); } else if (source === 'registry') { // Update orchestrator status const orchestratorStatus = this.mapRegistryStatusToOrchestrator(newStatus); const agent = this.agentOrchestrator.agents?.get(agentId); if (agent) { agent.status = orchestratorStatus; agent.lastHeartbeat = new Date(); logger.debug({ agentId, orchestratorStatus }, 'Status propagated from registry to orchestrator'); } } logger.info({ agentId, newStatus, source }, 'Agent status propagated successfully'); } catch (error) { logger.error({ err: error, agentId, newStatus, source }, 'Failed to propagate status change'); } } /** * Immediately propagate task assignment status change */ async propagateTaskStatusChange( agentId: string, taskId: string, taskStatus: 'assigned' | 'in_progress' | 'completed' | 'failed', source: 'registry' | 'orchestrator' ): Promise<void> { try { // Ensure dependencies are initialized if (!this.agentRegistry || !this.agentOrchestrator) { await this.initializeDependencies(); } logger.debug({ agentId, taskId, taskStatus, source }, 'Propagating task status change'); // Update agent's current tasks list in both systems if (source === 'orchestrator') { // Update registry const registryAgent = await this.agentRegistry.getAgent(agentId); if (registryAgent) { if (taskStatus === 'assigned' || taskStatus === 'in_progress') { if (!registryAgent.currentTasks?.includes(taskId)) { registryAgent.currentTasks = [...(registryAgent.currentTasks || []), taskId]; } } else if (taskStatus === 'completed' || taskStatus === 'failed') { registryAgent.currentTasks = (registryAgent.currentTasks || []).filter((id: string) => id !== taskId); } // Update agent status based on task load const taskCount = registryAgent.currentTasks?.length || 0; const maxTasks = registryAgent.maxConcurrentTasks || 1; const newStatus = taskCount >= maxTasks ? 'busy' : 'online'; await this.agentRegistry.updateAgentStatus(agentId, newStatus); } } else if (source === 'registry') { // Update orchestrator const orchestratorAgent = this.agentOrchestrator.agents?.get(agentId); if (orchestratorAgent) { if (taskStatus === 'assigned' || taskStatus === 'in_progress') { if (!orchestratorAgent.currentTasks.includes(taskId)) { orchestratorAgent.currentTasks.push(taskId); } } else if (taskStatus === 'completed' || taskStatus === 'failed') { orchestratorAgent.currentTasks = orchestratorAgent.currentTasks.filter((id: string) => id !== taskId); } // Update agent status based on task load const taskCount = orchestratorAgent.currentTasks.length; const maxTasks = orchestratorAgent.maxConcurrentTasks || 1; orchestratorAgent.status = taskCount >= maxTasks ? 'busy' : 'available'; orchestratorAgent.lastHeartbeat = new Date(); } } logger.debug({ agentId, taskId, taskStatus, source }, 'Task status propagated successfully'); } catch (error) { logger.error({ err: error, agentId, taskId, taskStatus, source }, 'Failed to propagate task status change'); } } /** * Enable/disable synchronization */ setSyncEnabled(enabled: boolean): void { this.syncEnabled = enabled; logger.info({ enabled }, 'Agent synchronization enabled/disabled'); } /** * Get unified agent by ID from either system */ async getUnifiedAgent(agentId: string): Promise<UnifiedAgent | null> { try { // Try registry first const registryAgent = await this.agentRegistry?.getAgent(agentId); if (registryAgent) { return this.convertRegistryToUnified(registryAgent); } // Try orchestrator const orchestratorAgents = await this.agentOrchestrator?.getAgents(); const orchestratorAgent = orchestratorAgents?.find((agent: AgentInfo) => agent.id === agentId); if (orchestratorAgent) { return this.convertOrchestratorToUnified(orchestratorAgent); } return null; } catch (error) { logger.error({ err: error, agentId }, 'Failed to get unified agent'); return null; } } /** * Convert registry agent to unified format */ private convertRegistryToUnified(registryAgent: AgentRegistration): UnifiedAgent { return { id: registryAgent.agentId, name: registryAgent.agentId, capabilities: registryAgent.capabilities, status: registryAgent.status === 'online' ? 'available' : (registryAgent.status as 'available' | 'busy' | 'offline' | 'error'), maxConcurrentTasks: registryAgent.maxConcurrentTasks, currentTasks: registryAgent.currentTasks || [], transportType: registryAgent.transportType, sessionId: registryAgent.sessionId, pollingInterval: registryAgent.pollingInterval, registeredAt: registryAgent.registeredAt || Date.now(), lastSeen: registryAgent.lastSeen || Date.now(), lastHeartbeat: new Date(registryAgent.lastSeen || Date.now()), performance: { tasksCompleted: 0, averageCompletionTime: 0, successRate: 1.0 }, httpEndpoint: registryAgent.httpEndpoint, httpAuthToken: registryAgent.httpAuthToken, websocketConnection: registryAgent.websocketConnection, metadata: { version: '1.0.0', supportedProtocols: [registryAgent.transportType], preferences: {} } }; } /** * Convert orchestrator agent to unified format */ private convertOrchestratorToUnified(orchestratorAgent: AgentInfo): UnifiedAgent { return { id: orchestratorAgent.id, name: orchestratorAgent.name, capabilities: orchestratorAgent.capabilities.map(cap => cap.toString()), status: orchestratorAgent.status === 'available' ? 'online' : (orchestratorAgent.status as 'online' | 'offline' | 'busy'), maxConcurrentTasks: orchestratorAgent.maxConcurrentTasks, currentTasks: orchestratorAgent.currentTasks, transportType: (orchestratorAgent.metadata?.preferences?.transportType as 'stdio' | 'sse' | 'websocket' | 'http') || 'stdio', sessionId: orchestratorAgent.metadata?.preferences?.sessionId as string | undefined, pollingInterval: orchestratorAgent.metadata?.preferences?.pollingInterval as number | undefined, registeredAt: Date.now(), lastSeen: orchestratorAgent.lastHeartbeat.getTime(), lastHeartbeat: orchestratorAgent.lastHeartbeat, performance: orchestratorAgent.performance, httpEndpoint: orchestratorAgent.metadata?.preferences?.httpEndpoint as string | undefined, httpAuthToken: orchestratorAgent.metadata?.preferences?.httpAuthToken as string | undefined, metadata: orchestratorAgent.metadata }; } }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/freshtechbro/vibe-coder-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server