Skip to main content
Glama
by Coder-RL
service-discovery.ts17.6 kB
import { ServiceRegistry, ServiceInfo } from './service-registry.js'; import { MessageBus, MessagePayload } from './message-bus.js'; import { RedisConnectionManager } from '../../database/redis-client.js'; import { getLogger } from '../../shared/logger.js'; const logger = getLogger('ServiceDiscovery'); export interface ServiceEndpoint { id: string; serviceName: string; host: string; port: number; protocol: 'http' | 'https' | 'stdio' | 'ws'; path?: string; weight: number; status: 'healthy' | 'unhealthy' | 'draining'; metadata: Record<string, any>; lastHealthCheck: Date; } export interface DiscoveryQuery { serviceName?: string; capabilities?: string[]; version?: string; tags?: string[]; healthyOnly?: boolean; protocol?: string; metadata?: Record<string, any>; } export interface LoadBalancingStrategy { name: 'round-robin' | 'weighted' | 'least-connections' | 'random' | 'health-aware'; options?: Record<string, any>; } export interface ServiceGroup { name: string; services: ServiceEndpoint[]; strategy: LoadBalancingStrategy; healthCheckInterval: number; failureThreshold: number; } export interface DiscoveryMetrics { totalQueries: number; successfulQueries: number; failedQueries: number; averageQueryTime: number; totalEndpoints: number; healthyEndpoints: number; unhealthyEndpoints: number; servicesDiscovered: number; } export class ServiceDiscovery { private registry: ServiceRegistry; private messageBus: MessageBus; private redis: RedisConnectionManager; private endpoints: Map<string, ServiceEndpoint> = new Map(); private serviceGroups: Map<string, ServiceGroup> = new Map(); private connectionTracking: Map<string, number> = new Map(); private roundRobinCounters: Map<string, number> = new Map(); private metrics: DiscoveryMetrics = { totalQueries: 0, successfulQueries: 0, failedQueries: 0, averageQueryTime: 0, totalEndpoints: 0, healthyEndpoints: 0, unhealthyEndpoints: 0, servicesDiscovered: 0, }; private healthCheckInterval: NodeJS.Timeout | null = null; private shutdownPromise: Promise<void> | null = null; constructor( registry: ServiceRegistry, messageBus: MessageBus, redis: RedisConnectionManager ) { this.registry = registry; this.messageBus = messageBus; this.redis = redis; this.setupSignalHandlers(); } private setupSignalHandlers(): void { process.on('SIGTERM', () => this.shutdown()); process.on('SIGINT', () => this.shutdown()); } async initialize(): Promise<void> { try { await this.loadServiceEndpoints(); await this.subscribeToServiceEvents(); await this.startHealthChecking(); logger.info('Service Discovery initialized successfully'); } catch (error) { logger.error('Failed to initialize Service Discovery', { error }); throw error; } } private async loadServiceEndpoints(): Promise<void> { try { const services = this.registry.getAllServices(); for (const service of services) { const endpoint = this.serviceInfoToEndpoint(service); this.endpoints.set(endpoint.id, endpoint); } this.updateMetrics(); logger.info(`Loaded ${this.endpoints.size} service endpoints`); } catch (error) { logger.error('Failed to load service endpoints', { error }); throw error; } } private serviceInfoToEndpoint(service: ServiceInfo): ServiceEndpoint { return { id: service.id, serviceName: service.name, host: service.host, port: service.port, protocol: service.protocol, weight: 100, status: service.status === 'healthy' ? 'healthy' : 'unhealthy', metadata: { version: service.version, capabilities: service.capabilities, registeredAt: service.registeredAt, }, lastHealthCheck: new Date(), }; } private async subscribeToServiceEvents(): Promise<void> { try { await this.messageBus.subscribeToStream( 'service-events', { group: 'service-discovery', consumer: `discovery-${process.pid}`, count: 10, block: 1000, }, this.handleServiceEvent.bind(this) ); logger.info('Subscribed to service events'); } catch (error) { logger.error('Failed to subscribe to service events', { error }); throw error; } } private async handleServiceEvent(message: MessagePayload): Promise<void> { try { const { type, data } = message; switch (type) { case 'service.registered': await this.handleServiceRegistered(data); break; case 'service.deregistered': await this.handleServiceDeregistered(data); break; case 'service.health_changed': await this.handleServiceHealthChanged(data); break; default: logger.debug(`Unknown service event type: ${type}`); } } catch (error) { logger.error('Failed to handle service event', { error, message }); } } private async handleServiceRegistered(serviceInfo: ServiceInfo): Promise<void> { const endpoint = this.serviceInfoToEndpoint(serviceInfo); this.endpoints.set(endpoint.id, endpoint); await this.redis.setJSON(`endpoint:${endpoint.id}`, endpoint, 3600); this.updateMetrics(); logger.info(`Added endpoint for service registration`, { endpointId: endpoint.id, serviceName: endpoint.serviceName, address: `${endpoint.host}:${endpoint.port}`, }); } private async handleServiceDeregistered(serviceInfo: ServiceInfo): Promise<void> { const endpointId = serviceInfo.id; const endpoint = this.endpoints.get(endpointId); if (endpoint) { this.endpoints.delete(endpointId); await this.redis.getClient().del(`endpoint:${endpointId}`); this.updateMetrics(); logger.info(`Removed endpoint for service deregistration`, { endpointId, serviceName: endpoint.serviceName, }); } } private async handleServiceHealthChanged(data: { serviceId: string; status: string }): Promise<void> { const endpoint = this.endpoints.get(data.serviceId); if (endpoint) { const oldStatus = endpoint.status; endpoint.status = data.status === 'healthy' ? 'healthy' : 'unhealthy'; endpoint.lastHealthCheck = new Date(); await this.redis.setJSON(`endpoint:${endpoint.id}`, endpoint, 3600); if (oldStatus !== endpoint.status) { logger.info(`Endpoint health status changed`, { endpointId: endpoint.id, serviceName: endpoint.serviceName, oldStatus, newStatus: endpoint.status, }); this.updateMetrics(); } } } async discoverServices(query: DiscoveryQuery): Promise<ServiceEndpoint[]> { const startTime = Date.now(); try { this.metrics.totalQueries++; let matchingEndpoints = Array.from(this.endpoints.values()); if (query.serviceName) { matchingEndpoints = matchingEndpoints.filter(ep => ep.serviceName.toLowerCase().includes(query.serviceName!.toLowerCase()) ); } if (query.protocol) { matchingEndpoints = matchingEndpoints.filter(ep => ep.protocol === query.protocol); } if (query.capabilities && query.capabilities.length > 0) { matchingEndpoints = matchingEndpoints.filter(ep => { const epCapabilities = ep.metadata.capabilities || []; return query.capabilities!.some(cap => epCapabilities.includes(cap)); }); } if (query.version) { matchingEndpoints = matchingEndpoints.filter(ep => ep.metadata.version === query.version ); } if (query.healthyOnly !== false) { matchingEndpoints = matchingEndpoints.filter(ep => ep.status === 'healthy'); } if (query.metadata) { matchingEndpoints = matchingEndpoints.filter(ep => { return Object.entries(query.metadata!).every(([key, value]) => ep.metadata[key] === value ); }); } const queryTime = Date.now() - startTime; this.updateQueryMetrics(queryTime, true); logger.debug(`Service discovery query completed`, { query, resultsCount: matchingEndpoints.length, queryTime, }); return matchingEndpoints; } catch (error) { const queryTime = Date.now() - startTime; this.updateQueryMetrics(queryTime, false); logger.error('Service discovery query failed', { error, query }); throw error; } } async selectEndpoint( serviceName: string, strategy: LoadBalancingStrategy = { name: 'round-robin' } ): Promise<ServiceEndpoint | null> { const endpoints = await this.discoverServices({ serviceName, healthyOnly: true }); if (endpoints.length === 0) { return null; } switch (strategy.name) { case 'round-robin': return this.selectRoundRobin(serviceName, endpoints); case 'weighted': return this.selectWeighted(endpoints); case 'least-connections': return this.selectLeastConnections(endpoints); case 'random': return this.selectRandom(endpoints); case 'health-aware': return this.selectHealthAware(endpoints); default: return endpoints[0]; } } private selectRoundRobin(serviceName: string, endpoints: ServiceEndpoint[]): ServiceEndpoint { const currentIndex = this.roundRobinCounters.get(serviceName) || 0; const selectedEndpoint = endpoints[currentIndex % endpoints.length]; this.roundRobinCounters.set(serviceName, currentIndex + 1); return selectedEndpoint; } private selectWeighted(endpoints: ServiceEndpoint[]): ServiceEndpoint { const totalWeight = endpoints.reduce((sum, ep) => sum + ep.weight, 0); const random = Math.random() * totalWeight; let currentWeight = 0; for (const endpoint of endpoints) { currentWeight += endpoint.weight; if (random <= currentWeight) { return endpoint; } } return endpoints[0]; } private selectLeastConnections(endpoints: ServiceEndpoint[]): ServiceEndpoint { return endpoints.reduce((least, current) => { const leastConnections = this.connectionTracking.get(least.id) || 0; const currentConnections = this.connectionTracking.get(current.id) || 0; return currentConnections < leastConnections ? current : least; }); } private selectRandom(endpoints: ServiceEndpoint[]): ServiceEndpoint { return endpoints[Math.floor(Math.random() * endpoints.length)]; } private selectHealthAware(endpoints: ServiceEndpoint[]): ServiceEndpoint { const healthyEndpoints = endpoints.filter(ep => ep.status === 'healthy'); const recentlyChecked = healthyEndpoints.filter(ep => { const timeSinceCheck = Date.now() - ep.lastHealthCheck.getTime(); return timeSinceCheck < 30000; }); const targetEndpoints = recentlyChecked.length > 0 ? recentlyChecked : healthyEndpoints; return this.selectWeighted(targetEndpoints); } trackConnection(endpointId: string, delta: number = 1): void { const current = this.connectionTracking.get(endpointId) || 0; const newCount = Math.max(0, current + delta); this.connectionTracking.set(endpointId, newCount); } async createServiceGroup( name: string, serviceName: string, strategy: LoadBalancingStrategy, options: { healthCheckInterval?: number; failureThreshold?: number; } = {} ): Promise<void> { try { const services = await this.discoverServices({ serviceName }); const serviceGroup: ServiceGroup = { name, services, strategy, healthCheckInterval: options.healthCheckInterval || 30000, failureThreshold: options.failureThreshold || 3, }; this.serviceGroups.set(name, serviceGroup); await this.redis.setJSON(`service-group:${name}`, serviceGroup, 3600); logger.info(`Created service group`, { groupName: name, serviceName, endpointCount: services.length, strategy: strategy.name, }); } catch (error) { logger.error(`Failed to create service group ${name}`, { error }); throw error; } } async getServiceGroup(name: string): Promise<ServiceGroup | null> { const group = this.serviceGroups.get(name); if (group) { return group; } try { const cachedGroup = await this.redis.getJSON<ServiceGroup>(`service-group:${name}`); if (cachedGroup) { this.serviceGroups.set(name, cachedGroup); return cachedGroup; } } catch (error) { logger.error(`Failed to load service group ${name} from cache`, { error }); } return null; } async removeServiceGroup(name: string): Promise<void> { this.serviceGroups.delete(name); await this.redis.getClient().del(`service-group:${name}`); logger.info(`Removed service group ${name}`); } private async startHealthChecking(): Promise<void> { this.healthCheckInterval = setInterval(async () => { await this.performHealthChecks(); }, 15000); logger.info('Started health checking'); } private async performHealthChecks(): Promise<void> { const endpoints = Array.from(this.endpoints.values()); const healthCheckPromises = endpoints.map(async (endpoint) => { try { const isHealthy = await this.checkEndpointHealth(endpoint); const newStatus = isHealthy ? 'healthy' : 'unhealthy'; if (endpoint.status !== newStatus) { endpoint.status = newStatus; endpoint.lastHealthCheck = new Date(); await this.redis.setJSON(`endpoint:${endpoint.id}`, endpoint, 3600); logger.info(`Endpoint health status changed`, { endpointId: endpoint.id, serviceName: endpoint.serviceName, newStatus, }); } } catch (error) { logger.error(`Health check failed for endpoint ${endpoint.id}`, { error }); } }); await Promise.allSettled(healthCheckPromises); this.updateMetrics(); } private async checkEndpointHealth(endpoint: ServiceEndpoint): Promise<boolean> { try { const service = this.registry.getService(endpoint.id); return service?.status === 'healthy' || false; } catch (error) { return false; } } private updateMetrics(): void { const endpoints = Array.from(this.endpoints.values()); this.metrics.totalEndpoints = endpoints.length; this.metrics.healthyEndpoints = endpoints.filter(ep => ep.status === 'healthy').length; this.metrics.unhealthyEndpoints = endpoints.filter(ep => ep.status === 'unhealthy').length; const uniqueServices = new Set(endpoints.map(ep => ep.serviceName)); this.metrics.servicesDiscovered = uniqueServices.size; } private updateQueryMetrics(queryTime: number, success: boolean): void { if (success) { this.metrics.successfulQueries++; const currentAvg = this.metrics.averageQueryTime; const successfulCount = this.metrics.successfulQueries; this.metrics.averageQueryTime = (currentAvg * (successfulCount - 1) + queryTime) / successfulCount; } else { this.metrics.failedQueries++; } } getAllEndpoints(): ServiceEndpoint[] { return Array.from(this.endpoints.values()); } getEndpoint(endpointId: string): ServiceEndpoint | undefined { return this.endpoints.get(endpointId); } getServiceGroups(): ServiceGroup[] { return Array.from(this.serviceGroups.values()); } getMetrics(): DiscoveryMetrics { return { ...this.metrics }; } async healthCheck(): Promise<{ healthy: boolean; details: any }> { try { const registryHealth = await this.registry.healthCheck(); const redisHealth = await this.redis.healthCheck(); return { healthy: registryHealth.healthy && redisHealth.healthy, details: { registry: registryHealth, redis: redisHealth, metrics: this.metrics, endpoints: this.metrics.totalEndpoints, healthyEndpoints: this.metrics.healthyEndpoints, serviceGroups: this.serviceGroups.size, }, }; } catch (error) { return { healthy: false, details: { error: error instanceof Error ? error.message : 'Unknown error' }, }; } } async shutdown(): Promise<void> { if (this.shutdownPromise) { return this.shutdownPromise; } this.shutdownPromise = this.performShutdown(); return this.shutdownPromise; } private async performShutdown(): Promise<void> { logger.info('Shutting down Service Discovery...'); if (this.healthCheckInterval) { clearInterval(this.healthCheckInterval); this.healthCheckInterval = null; } try { await this.messageBus.unsubscribeFromStream('service-events', { group: 'service-discovery', consumer: `discovery-${process.pid}`, }); } catch (error) { logger.error('Failed to unsubscribe from service events', { error }); } this.endpoints.clear(); this.serviceGroups.clear(); this.connectionTracking.clear(); this.roundRobinCounters.clear(); logger.info('Service Discovery shutdown complete'); } }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Coder-RL/Claude_MCPServer_Dev1'

If you have feedback or need assistance with the MCP directory API, please join our Discord server