Skip to main content
Glama
by Coder-RL
service-registry.ts21 kB
import { EventEmitter } from 'events'; import * as crypto from 'crypto'; export interface ServiceInstance { id: string; serviceName: string; version: string; host: string; port: number; protocol: 'http' | 'https' | 'grpc' | 'tcp' | 'udp'; metadata: Record<string, any>; tags: string[]; healthCheck: HealthCheckConfig; status: 'starting' | 'healthy' | 'unhealthy' | 'draining' | 'stopped'; registeredAt: Date; lastHeartbeat: Date; metrics: ServiceMetrics; } export interface HealthCheckConfig { enabled: boolean; path?: string; method?: string; interval: number; // seconds timeout: number; // seconds retries: number; gracePeriod: number; // seconds headers?: Record<string, string>; expectedStatus?: number[]; expectedBody?: string; tcp?: boolean; } export interface ServiceMetrics { requestCount: number; errorCount: number; responseTime: number; lastRequestTime: Date; uptime: number; memoryUsage: number; cpuUsage: number; } export interface ServiceDiscoveryQuery { serviceName?: string; version?: string; tags?: string[]; metadata?: Record<string, any>; status?: string[]; limit?: number; excludeUnhealthy?: boolean; } export interface LoadBalancingStrategy { algorithm: 'round-robin' | 'least-connections' | 'random' | 'weighted' | 'ip-hash' | 'consistent-hash'; healthyOnly: boolean; stickySession?: { enabled: boolean; key: string; // cookie name or header name ttl: number; // seconds }; weights?: Record<string, number>; // instance id -> weight } export interface ServiceRoute { id: string; pattern: string; method?: string; headers?: Record<string, string>; serviceName: string; version?: string; rewrite?: { path?: string; headers?: Record<string, string>; }; timeout: number; retries: number; loadBalancing: LoadBalancingStrategy; middleware: string[]; enabled: boolean; } export class ServiceRegistry extends EventEmitter { private services = new Map<string, ServiceInstance>(); private routes = new Map<string, ServiceRoute>(); private healthCheckTimers = new Map<string, NodeJS.Timeout>(); private stickySessionStore = new Map<string, { instanceId: string; expires: number }>(); private connectionCounts = new Map<string, number>(); private lastSelectedInstance = new Map<string, string>(); private consistentHashRing = new Map<string, string[]>(); constructor() { super(); this.startCleanupProcess(); } registerService(service: Omit<ServiceInstance, 'id' | 'registeredAt' | 'lastHeartbeat' | 'metrics'>): string { const serviceInstance: ServiceInstance = { id: this.generateServiceId(service.serviceName, service.host, service.port), registeredAt: new Date(), lastHeartbeat: new Date(), metrics: { requestCount: 0, errorCount: 0, responseTime: 0, lastRequestTime: new Date(), uptime: 0, memoryUsage: 0, cpuUsage: 0 }, ...service }; this.services.set(serviceInstance.id, serviceInstance); // Start health checking if enabled if (serviceInstance.healthCheck.enabled) { this.startHealthCheck(serviceInstance.id); } // Update consistent hash ring this.updateConsistentHashRing(serviceInstance.serviceName); this.emit('service-registered', serviceInstance); return serviceInstance.id; } deregisterService(serviceId: string): boolean { const service = this.services.get(serviceId); if (!service) { return false; } // Stop health checking const healthTimer = this.healthCheckTimers.get(serviceId); if (healthTimer) { clearInterval(healthTimer); this.healthCheckTimers.delete(serviceId); } // Remove from registry this.services.delete(serviceId); // Update consistent hash ring this.updateConsistentHashRing(service.serviceName); // Clean up connection counts this.connectionCounts.delete(serviceId); this.emit('service-deregistered', { serviceId, serviceName: service.serviceName }); return true; } heartbeat(serviceId: string, metrics?: Partial<ServiceMetrics>): boolean { const service = this.services.get(serviceId); if (!service) { return false; } service.lastHeartbeat = new Date(); if (metrics) { Object.assign(service.metrics, metrics); } // Update uptime service.metrics.uptime = Date.now() - service.registeredAt.getTime(); this.emit('heartbeat-received', { serviceId, serviceName: service.serviceName }); return true; } discoverServices(query: ServiceDiscoveryQuery): ServiceInstance[] { let results = Array.from(this.services.values()); // Filter by service name if (query.serviceName) { results = results.filter(s => s.serviceName === query.serviceName); } // Filter by version if (query.version) { results = results.filter(s => s.version === query.version); } // Filter by tags if (query.tags && query.tags.length > 0) { results = results.filter(s => query.tags!.every(tag => s.tags.includes(tag)) ); } // Filter by metadata if (query.metadata) { results = results.filter(s => { return Object.entries(query.metadata!).every(([key, value]) => s.metadata[key] === value ); }); } // Filter by status if (query.status && query.status.length > 0) { results = results.filter(s => query.status!.includes(s.status)); } // Exclude unhealthy services if (query.excludeUnhealthy) { results = results.filter(s => s.status === 'healthy'); } // Apply limit if (query.limit && query.limit > 0) { results = results.slice(0, query.limit); } return results; } selectInstance(serviceName: string, strategy: LoadBalancingStrategy, context?: any): ServiceInstance | null { const candidates = this.discoverServices({ serviceName, status: strategy.healthyOnly ? ['healthy'] : ['healthy', 'starting'], excludeUnhealthy: strategy.healthyOnly }); if (candidates.length === 0) { return null; } // Handle sticky sessions if (strategy.stickySession?.enabled && context?.sessionKey) { const sessionId = this.getSessionId(context.sessionKey, strategy.stickySession.key); const stickyInstance = this.getStickyInstance(sessionId); if (stickyInstance && candidates.find(c => c.id === stickyInstance)) { return this.services.get(stickyInstance) || null; } } let selectedInstance: ServiceInstance; switch (strategy.algorithm) { case 'round-robin': selectedInstance = this.selectRoundRobin(candidates, serviceName); break; case 'least-connections': selectedInstance = this.selectLeastConnections(candidates); break; case 'random': selectedInstance = this.selectRandom(candidates); break; case 'weighted': selectedInstance = this.selectWeighted(candidates, strategy.weights || {}); break; case 'ip-hash': selectedInstance = this.selectIpHash(candidates, context?.clientIp || ''); break; case 'consistent-hash': selectedInstance = this.selectConsistentHash(serviceName, context?.hashKey || ''); break; default: selectedInstance = candidates[0]; } // Update sticky session if enabled if (strategy.stickySession?.enabled && context?.sessionKey) { const sessionId = this.getSessionId(context.sessionKey, strategy.stickySession.key); this.setStickyInstance(sessionId, selectedInstance.id, strategy.stickySession.ttl); } // Update connection count this.incrementConnectionCount(selectedInstance.id); return selectedInstance; } private selectRoundRobin(candidates: ServiceInstance[], serviceName: string): ServiceInstance { const lastSelected = this.lastSelectedInstance.get(serviceName); if (!lastSelected) { const selected = candidates[0]; this.lastSelectedInstance.set(serviceName, selected.id); return selected; } const lastIndex = candidates.findIndex(c => c.id === lastSelected); const nextIndex = (lastIndex + 1) % candidates.length; const selected = candidates[nextIndex]; this.lastSelectedInstance.set(serviceName, selected.id); return selected; } private selectLeastConnections(candidates: ServiceInstance[]): ServiceInstance { return candidates.reduce((least, current) => { const leastConnections = this.connectionCounts.get(least.id) || 0; const currentConnections = this.connectionCounts.get(current.id) || 0; return currentConnections < leastConnections ? current : least; }); } private selectRandom(candidates: ServiceInstance[]): ServiceInstance { const randomIndex = Math.floor(Math.random() * candidates.length); return candidates[randomIndex]; } private selectWeighted(candidates: ServiceInstance[], weights: Record<string, number>): ServiceInstance { const totalWeight = candidates.reduce((sum, instance) => sum + (weights[instance.id] || 1), 0 ); let random = Math.random() * totalWeight; for (const instance of candidates) { const weight = weights[instance.id] || 1; random -= weight; if (random <= 0) { return instance; } } return candidates[candidates.length - 1]; } private selectIpHash(candidates: ServiceInstance[], clientIp: string): ServiceInstance { const hash = crypto.createHash('md5').update(clientIp).digest('hex'); const index = parseInt(hash.substring(0, 8), 16) % candidates.length; return candidates[index]; } private selectConsistentHash(serviceName: string, hashKey: string): ServiceInstance { const ring = this.consistentHashRing.get(serviceName); if (!ring || ring.length === 0) { // Fallback to first available instance const candidates = this.discoverServices({ serviceName, excludeUnhealthy: true }); return candidates.length > 0 ? candidates[0] : null!; } const hash = crypto.createHash('md5').update(hashKey).digest('hex'); const position = parseInt(hash.substring(0, 8), 16); // Find the first node >= position for (const nodeHash of ring) { if (parseInt(nodeHash, 16) >= position) { const instanceId = this.getInstanceFromHash(serviceName, nodeHash); const instance = this.services.get(instanceId); if (instance) { return instance; } } } // Wrap around to first node const firstNodeHash = ring[0]; const instanceId = this.getInstanceFromHash(serviceName, firstNodeHash); return this.services.get(instanceId) || null!; } private updateConsistentHashRing(serviceName: string): void { const instances = this.discoverServices({ serviceName }); const ring: string[] = []; // Create virtual nodes for better distribution const virtualNodes = 150; for (const instance of instances) { for (let i = 0; i < virtualNodes; i++) { const hash = crypto.createHash('md5') .update(`${instance.id}:${i}`) .digest('hex') .substring(0, 8); ring.push(hash); } } ring.sort((a, b) => parseInt(a, 16) - parseInt(b, 16)); this.consistentHashRing.set(serviceName, ring); } private getInstanceFromHash(serviceName: string, hash: string): string { // This is a simplified implementation // In practice, you'd need to maintain a mapping from hash to instance const instances = this.discoverServices({ serviceName }); const index = parseInt(hash, 16) % instances.length; return instances[index]?.id || ''; } private generateServiceId(serviceName: string, host: string, port: number): string { const data = `${serviceName}:${host}:${port}:${Date.now()}`; return crypto.createHash('md5').update(data).digest('hex').substring(0, 16); } private startHealthCheck(serviceId: string): void { const service = this.services.get(serviceId); if (!service || !service.healthCheck.enabled) { return; } const checkHealth = async () => { try { const isHealthy = await this.performHealthCheck(service); const previousStatus = service.status; if (isHealthy) { service.status = service.status === 'starting' ? 'healthy' : service.status; if (service.status !== 'healthy' && service.status !== 'draining') { service.status = 'healthy'; } } else { service.status = 'unhealthy'; } if (previousStatus !== service.status) { this.emit('service-status-changed', { serviceId, serviceName: service.serviceName, previousStatus, currentStatus: service.status }); } } catch (error) { service.status = 'unhealthy'; this.emit('health-check-failed', { serviceId, serviceName: service.serviceName, error: (error as Error).message }); } }; // Perform initial check after grace period setTimeout(checkHealth, service.healthCheck.gracePeriod * 1000); // Set up regular health checks const timer = setInterval(checkHealth, service.healthCheck.interval * 1000); this.healthCheckTimers.set(serviceId, timer); } private async performHealthCheck(service: ServiceInstance): Promise<boolean> { const config = service.healthCheck; if (config.tcp) { return this.performTcpHealthCheck(service); } if (config.path) { return this.performHttpHealthCheck(service); } // Default to always healthy if no specific check is configured return true; } private async performTcpHealthCheck(service: ServiceInstance): Promise<boolean> { return new Promise((resolve) => { const net = require('net'); const socket = new net.Socket(); const timeout = setTimeout(() => { socket.destroy(); resolve(false); }, service.healthCheck.timeout * 1000); socket.connect(service.port, service.host, () => { clearTimeout(timeout); socket.destroy(); resolve(true); }); socket.on('error', () => { clearTimeout(timeout); resolve(false); }); }); } private async performHttpHealthCheck(service: ServiceInstance): Promise<boolean> { const url = `${service.protocol}://${service.host}:${service.port}${service.healthCheck.path}`; try { const controller = new AbortController(); const timeoutId = setTimeout(() => controller.abort(), service.healthCheck.timeout * 1000); const response = await fetch(url, { method: service.healthCheck.method || 'GET', headers: service.healthCheck.headers || {}, signal: controller.signal }); clearTimeout(timeoutId); // Check status code const expectedStatuses = service.healthCheck.expectedStatus || [200]; if (!expectedStatuses.includes(response.status)) { return false; } // Check response body if expected if (service.healthCheck.expectedBody) { const body = await response.text(); return body.includes(service.healthCheck.expectedBody); } return true; } catch (error) { return false; } } private getSessionId(sessionKey: string, key: string): string { // Extract session identifier from request context return crypto.createHash('md5').update(`${sessionKey}:${key}`).digest('hex'); } private getStickyInstance(sessionId: string): string | null { const session = this.stickySessionStore.get(sessionId); if (!session || session.expires < Date.now()) { this.stickySessionStore.delete(sessionId); return null; } return session.instanceId; } private setStickyInstance(sessionId: string, instanceId: string, ttl: number): void { this.stickySessionStore.set(sessionId, { instanceId, expires: Date.now() + ttl * 1000 }); } private incrementConnectionCount(instanceId: string): void { const current = this.connectionCounts.get(instanceId) || 0; this.connectionCounts.set(instanceId, current + 1); } decrementConnectionCount(instanceId: string): void { const current = this.connectionCounts.get(instanceId) || 0; this.connectionCounts.set(instanceId, Math.max(0, current - 1)); } private startCleanupProcess(): void { setInterval(() => { this.cleanupStaleServices(); this.cleanupExpiredSessions(); }, 60000); // Every minute } private cleanupStaleServices(): void { const staleThreshold = 3 * 60 * 1000; // 3 minutes const now = Date.now(); for (const [serviceId, service] of this.services) { const timeSinceHeartbeat = now - service.lastHeartbeat.getTime(); if (timeSinceHeartbeat > staleThreshold && service.status !== 'stopped') { service.status = 'unhealthy'; this.emit('service-stale', { serviceId, serviceName: service.serviceName }); // Remove completely stale services (no heartbeat for 10 minutes) if (timeSinceHeartbeat > 10 * 60 * 1000) { this.deregisterService(serviceId); } } } } private cleanupExpiredSessions(): void { const now = Date.now(); for (const [sessionId, session] of this.stickySessionStore) { if (session.expires < now) { this.stickySessionStore.delete(sessionId); } } } // Route management addRoute(route: ServiceRoute): void { this.routes.set(route.id, route); this.emit('route-added', route); } removeRoute(routeId: string): void { const route = this.routes.get(routeId); if (route) { this.routes.delete(routeId); this.emit('route-removed', { routeId }); } } findRoute(path: string, method: string, headers: Record<string, string> = {}): ServiceRoute | null { for (const route of this.routes.values()) { if (!route.enabled) continue; // Check method if (route.method && route.method.toUpperCase() !== method.toUpperCase()) { continue; } // Check path pattern if (!this.matchPath(path, route.pattern)) { continue; } // Check headers if (route.headers && !this.matchHeaders(headers, route.headers)) { continue; } return route; } return null; } private matchPath(path: string, pattern: string): boolean { // Simple pattern matching - in production use a proper router const regexPattern = pattern .replace(/\*/g, '.*') .replace(/\{[^}]+\}/g, '[^/]+'); const regex = new RegExp(`^${regexPattern}$`); return regex.test(path); } private matchHeaders(headers: Record<string, string>, requiredHeaders: Record<string, string>): boolean { for (const [key, value] of Object.entries(requiredHeaders)) { if (headers[key.toLowerCase()] !== value) { return false; } } return true; } getServices(): ServiceInstance[] { return Array.from(this.services.values()); } getService(serviceId: string): ServiceInstance | null { return this.services.get(serviceId) || null; } getRoutes(): ServiceRoute[] { return Array.from(this.routes.values()); } getServicesByName(serviceName: string): ServiceInstance[] { return Array.from(this.services.values()) .filter(s => s.serviceName === serviceName); } getStats(): any { const services = Array.from(this.services.values()); return { services: { total: services.length, byStatus: services.reduce((acc, s) => { acc[s.status] = (acc[s.status] || 0) + 1; return acc; }, {} as Record<string, number>), byName: services.reduce((acc, s) => { acc[s.serviceName] = (acc[s.serviceName] || 0) + 1; return acc; }, {} as Record<string, number>) }, routes: { total: this.routes.size, enabled: Array.from(this.routes.values()).filter(r => r.enabled).length }, healthChecks: this.healthCheckTimers.size, stickySessions: this.stickySessionStore.size, connectionCounts: Object.fromEntries(this.connectionCounts) }; } destroy(): void { // Clear all health check timers for (const timer of this.healthCheckTimers.values()) { clearInterval(timer); } this.services.clear(); this.routes.clear(); this.healthCheckTimers.clear(); this.stickySessionStore.clear(); this.connectionCounts.clear(); this.lastSelectedInstance.clear(); this.consistentHashRing.clear(); this.removeAllListeners(); } }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Coder-RL/Claude_MCPServer_Dev1'

If you have feedback or need assistance with the MCP directory API, please join our Discord server