/**
* Service Registry Module
*
* Author: Yobie Benjamin
* Version: 0.2
* Date: July 28, 2025
*
* The ServiceRegistry maintains a catalog of all connected MCP services.
* It handles service discovery, health checking, and metadata management.
*/
import { EventEmitter } from 'eventemitter3';
import winston from 'winston';
const logger = winston.createLogger({
level: 'debug',
format: winston.format.simple()
});
/**
* Service metadata structure
* Contains all information about a connected MCP service
*/
export interface ServiceMetadata {
id: string;
name: string;
description: string;
version: string;
endpoint: string;
transport: 'stdio' | 'http' | 'websocket';
capabilities: {
tools: boolean;
resources: boolean;
prompts: boolean;
completion: boolean;
};
status: 'connected' | 'disconnected' | 'error';
lastHeartbeat?: Date;
metadata?: Record<string, any>;
}
/**
* Service health status
* Tracks the operational state of each service
*/
interface ServiceHealth {
serviceId: string;
healthy: boolean;
latencyMs: number;
lastCheck: Date;
errorCount: number;
consecutiveFailures: number;
}
/**
* Registry class that manages all connected MCP services
* Provides service discovery, health monitoring, and routing information
*/
export class ServiceRegistry extends EventEmitter {
private services: Map<string, ServiceMetadata>;
private health: Map<string, ServiceHealth>;
private healthCheckInterval: NodeJS.Timeout | null = null;
constructor() {
super();
/**
* Initialize service and health maps
* These maps store all service information and health status
*/
this.services = new Map();
this.health = new Map();
/**
* Start health monitoring
* Regular health checks ensure service availability
*/
this.startHealthMonitoring();
}
/**
* Register a new MCP service
* Adds the service to the registry and initializes health tracking
*/
registerService(metadata: ServiceMetadata): void {
logger.info(`Registering service: ${metadata.id}`, metadata);
/**
* Store service metadata
* This information is used for routing and capability checking
*/
this.services.set(metadata.id, metadata);
/**
* Initialize health tracking for the service
* Health status helps with failover and load balancing
*/
this.health.set(metadata.id, {
serviceId: metadata.id,
healthy: true,
latencyMs: 0,
lastCheck: new Date(),
errorCount: 0,
consecutiveFailures: 0
});
/**
* Emit registration event
* Other components can react to new service availability
*/
this.emit('service:registered', metadata);
}
/**
* Unregister an MCP service
* Removes the service from active registry
*/
unregisterService(serviceId: string): void {
logger.info(`Unregistering service: ${serviceId}`);
const service = this.services.get(serviceId);
if (!service) {
logger.warn(`Attempted to unregister unknown service: ${serviceId}`);
return;
}
/**
* Remove from registries
* Clean up all references to the service
*/
this.services.delete(serviceId);
this.health.delete(serviceId);
/**
* Emit unregistration event
* Notify other components of service removal
*/
this.emit('service:unregistered', service);
}
/**
* Get service metadata by ID
* Returns detailed information about a specific service
*/
getService(serviceId: string): ServiceMetadata | undefined {
return this.services.get(serviceId);
}
/**
* Check if a service is registered
* Quick check for service availability
*/
hasService(serviceId: string): boolean {
return this.services.has(serviceId);
}
/**
* List all registered service IDs
* Returns array of service identifiers
*/
listServices(): string[] {
return Array.from(this.services.keys());
}
/**
* Get all services with specific capability
* Useful for finding services that support certain operations
*/
getServicesWithCapability(capability: keyof ServiceMetadata['capabilities']): ServiceMetadata[] {
return Array.from(this.services.values()).filter(
service => service.capabilities[capability]
);
}
/**
* Get services by transport type
* Filter services based on their connection method
*/
getServicesByTransport(transport: ServiceMetadata['transport']): ServiceMetadata[] {
return Array.from(this.services.values()).filter(
service => service.transport === transport
);
}
/**
* Update service status
* Changes the operational status of a service
*/
updateServiceStatus(serviceId: string, status: ServiceMetadata['status']): void {
const service = this.services.get(serviceId);
if (!service) {
logger.warn(`Cannot update status for unknown service: ${serviceId}`);
return;
}
/**
* Update status and heartbeat
* Track when the service was last active
*/
service.status = status;
service.lastHeartbeat = new Date();
/**
* Emit status change event
* Components can react to service state changes
*/
this.emit('service:status-changed', {
serviceId,
oldStatus: service.status,
newStatus: status
});
}
/**
* Get health status for a service
* Returns current health metrics
*/
getServiceHealth(serviceId: string): ServiceHealth | undefined {
return this.health.get(serviceId);
}
/**
* Check if a service is healthy
* Quick health check for routing decisions
*/
isServiceHealthy(serviceId: string): boolean {
const health = this.health.get(serviceId);
return health?.healthy ?? false;
}
/**
* Get all healthy services
* Returns only services that are currently operational
*/
getHealthyServices(): ServiceMetadata[] {
return Array.from(this.services.values()).filter(
service => this.isServiceHealthy(service.id)
);
}
/**
* Update health status for a service
* Called after health checks or service interactions
*/
updateServiceHealth(
serviceId: string,
healthy: boolean,
latencyMs?: number,
error?: Error
): void {
const health = this.health.get(serviceId);
if (!health) {
logger.warn(`Cannot update health for unknown service: ${serviceId}`);
return;
}
/**
* Update health metrics
* Track success/failure patterns
*/
health.healthy = healthy;
health.lastCheck = new Date();
if (latencyMs !== undefined) {
health.latencyMs = latencyMs;
}
if (healthy) {
health.consecutiveFailures = 0;
} else {
health.errorCount++;
health.consecutiveFailures++;
/**
* Mark service as disconnected after multiple failures
* Prevents routing to consistently failing services
*/
if (health.consecutiveFailures >= 3) {
this.updateServiceStatus(serviceId, 'error');
}
}
/**
* Emit health update event
* Monitoring tools can track service health
*/
this.emit('service:health-updated', {
serviceId,
health: health,
error: error?.message
});
}
/**
* Find best service for a capability
* Considers health and latency for optimal routing
*/
findBestService(capability: keyof ServiceMetadata['capabilities']): ServiceMetadata | undefined {
const capableServices = this.getServicesWithCapability(capability);
/**
* Filter to healthy services only
* Avoid routing to unhealthy services
*/
const healthyServices = capableServices.filter(
service => this.isServiceHealthy(service.id)
);
if (healthyServices.length === 0) {
return undefined;
}
/**
* Sort by latency for optimal performance
* Choose the service with lowest latency
*/
healthyServices.sort((a, b) => {
const healthA = this.health.get(a.id)!;
const healthB = this.health.get(b.id)!;
return healthA.latencyMs - healthB.latencyMs;
});
return healthyServices[0];
}
/**
* Start health monitoring for all services
* Regular health checks ensure service availability
*/
private startHealthMonitoring(): void {
/**
* Set up periodic health checks
* Check every 30 seconds by default
*/
this.healthCheckInterval = setInterval(() => {
this.performHealthChecks();
}, 30000);
logger.info('Health monitoring started');
}
/**
* Perform health checks on all services
* Pings each service to verify availability
*/
private async performHealthChecks(): Promise<void> {
for (const [serviceId, service] of this.services) {
/**
* Skip services that are already marked as disconnected
* No need to check services we know are down
*/
if (service.status === 'disconnected') {
continue;
}
try {
/**
* Perform service-specific health check
* Different transports require different check methods
*/
const startTime = Date.now();
const healthy = await this.checkServiceHealth(service);
const latency = Date.now() - startTime;
this.updateServiceHealth(serviceId, healthy, latency);
} catch (error) {
logger.error(`Health check failed for ${serviceId}:`, error);
this.updateServiceHealth(serviceId, false, undefined, error as Error);
}
}
}
/**
* Check health of a specific service
* Implements transport-specific health check logic
*/
private async checkServiceHealth(service: ServiceMetadata): Promise<boolean> {
// This would be implemented based on transport type
// For now, we'll simulate with a simple check
switch (service.transport) {
case 'http':
// Would make HTTP health check request
return true;
case 'websocket':
// Would send WebSocket ping
return true;
case 'stdio':
// Would check process status
return true;
default:
return false;
}
}
/**
* Get registry statistics
* Provides overview of service registry state
*/
getStatistics(): {
totalServices: number;
healthyServices: number;
unhealthyServices: number;
byTransport: Record<string, number>;
byCapability: Record<string, number>;
} {
const stats = {
totalServices: this.services.size,
healthyServices: 0,
unhealthyServices: 0,
byTransport: {} as Record<string, number>,
byCapability: {} as Record<string, number>
};
for (const service of this.services.values()) {
// Count healthy vs unhealthy
if (this.isServiceHealthy(service.id)) {
stats.healthyServices++;
} else {
stats.unhealthyServices++;
}
// Count by transport
stats.byTransport[service.transport] =
(stats.byTransport[service.transport] || 0) + 1;
// Count by capability
for (const [cap, enabled] of Object.entries(service.capabilities)) {
if (enabled) {
stats.byCapability[cap] = (stats.byCapability[cap] || 0) + 1;
}
}
}
return stats;
}
/**
* Clean up resources
* Stop health monitoring and clear registries
*/
destroy(): void {
if (this.healthCheckInterval) {
clearInterval(this.healthCheckInterval);
this.healthCheckInterval = null;
}
this.services.clear();
this.health.clear();
this.removeAllListeners();
logger.info('Service registry destroyed');
}
}