// Copyright 2025 Chris Bunting
// Brief: Cross-server communication service for MCP Code Analysis & Quality Server
// Scope: Handles communication between Static Analysis, Dependency Analysis, and Code Complexity Analyzer servers
import { EventEmitter } from 'events';
import { v4 as uuidv4 } from 'uuid';
import {
CrossServerMessage,
CrossServerResponse,
ServerRegistration,
ServerCapability,
HealthStatus,
MessageType,
ServerSource,
PriorityLevel,
LoggerInterface
} from '@mcp-code-analysis/shared-types';
export interface CommunicationServiceConfig {
heartbeatInterval: number;
responseTimeout: number;
maxRetries: number;
enableBroadcast: boolean;
enableCompression: boolean;
}
export interface ServerEndpoint {
id: string;
url: string;
serverType: ServerSource;
isActive: boolean;
lastHeartbeat: Date;
capabilities: ServerCapability[];
}
export class CrossServerCommunicationService extends EventEmitter {
private servers: Map<string, ServerRegistration> = new Map();
private endpoints: Map<string, ServerEndpoint> = new Map();
private messageQueue: Map<string, CrossServerMessage[]> = new Map();
private responseHandlers: Map<string, (response: CrossServerResponse) => void> = new Map();
private heartbeatInterval: ReturnType<typeof setInterval> | null = null;
private config: CommunicationServiceConfig;
private logger: LoggerInterface;
constructor(config: CommunicationServiceConfig, logger: LoggerInterface) {
super();
this.config = config;
this.logger = logger;
this.setupEventHandlers();
}
private setupEventHandlers(): void {
this.on('message', this.handleMessage.bind(this));
this.on('registration', this.handleRegistration.bind(this));
this.on('heartbeat', this.handleHeartbeat.bind(this));
this.on('deregistration', this.handleDeregistration.bind(this));
}
async initialize(): Promise<void> {
this.logger.info('Initializing Cross-Server Communication Service');
// Start heartbeat monitoring
this.startHeartbeatMonitoring();
// Initialize message processing
this.startMessageProcessing();
this.logger.info('Cross-Server Communication Service initialized successfully');
}
async registerServer(registration: ServerRegistration): Promise<void> {
try {
this.logger.info(`Registering server: ${registration.name} (${registration.type})`);
// Validate registration
this.validateRegistration(registration);
// Store server registration
this.servers.set(registration.id, registration);
// Create endpoint if URL provided
if (registration.endpoint) {
const endpoint: ServerEndpoint = {
id: registration.id,
url: registration.endpoint,
serverType: registration.type,
isActive: true,
lastHeartbeat: new Date(),
capabilities: registration.capabilities
};
this.endpoints.set(registration.id, endpoint);
}
// Emit registration event
this.emit('registration', registration);
// Notify other servers
await this.broadcastMessage({
id: uuidv4(),
type: MessageType.REGISTRATION,
source: ServerSource.UNIFIED,
target: 'broadcast',
timestamp: new Date(),
payload: registration,
priority: PriorityLevel.MEDIUM
});
this.logger.info(`Server registered successfully: ${registration.name}`);
} catch (error) {
this.logger.error(`Failed to register server ${registration.name}:`, error);
throw error;
}
}
async deregisterServer(serverId: string): Promise<void> {
try {
const server = this.servers.get(serverId);
if (!server) {
throw new Error(`Server not found: ${serverId}`);
}
this.logger.info(`Deregistering server: ${server.name} (${server.type})`);
// Remove server registration
this.servers.delete(serverId);
// Remove endpoint
this.endpoints.delete(serverId);
// Clear message queue for this server
this.messageQueue.delete(serverId);
// Emit deregistration event
this.emit('deregistration', server);
// Notify other servers
await this.broadcastMessage({
id: uuidv4(),
type: MessageType.DEREGISTRATION,
source: ServerSource.UNIFIED,
target: 'broadcast',
timestamp: new Date(),
payload: { serverId, serverName: server.name },
priority: PriorityLevel.MEDIUM
});
this.logger.info(`Server deregistered successfully: ${server.name}`);
} catch (error) {
this.logger.error(`Failed to deregister server ${serverId}:`, error);
throw error;
}
}
async sendMessage(message: CrossServerMessage): Promise<CrossServerResponse> {
try {
this.logger.debug(`Sending message from ${message.source} to ${message.target}`);
// Validate message
this.validateMessage(message);
// If message requires response, set up handler
if (message.requiresResponse) {
return new Promise((resolve, reject) => {
const timeout = setTimeout(() => {
this.responseHandlers.delete(message.id);
reject(new Error(`Message response timeout: ${message.id}`));
}, this.config.responseTimeout);
this.responseHandlers.set(message.id, (response: CrossServerResponse) => {
clearTimeout(timeout);
this.responseHandlers.delete(message.id);
resolve(response);
});
// Send message
this.deliverMessage(message);
});
} else {
// Fire and forget
await this.deliverMessage(message);
return {
id: uuidv4(),
correlationId: message.id,
source: ServerSource.UNIFIED,
target: message.source as ServerSource,
timestamp: new Date(),
success: true,
processingTime: 0
};
}
} catch (error) {
this.logger.error('Failed to send message:', error);
throw error;
}
}
async broadcastMessage(message: CrossServerMessage): Promise<void> {
if (!this.config.enableBroadcast) {
this.logger.warn('Broadcast is disabled');
return;
}
message.target = 'broadcast';
// Send to all registered servers except the source
const promises = Array.from(this.servers.values())
.filter(server => server.type !== message.source)
.map(server => {
const targetedMessage = { ...message, target: server.type };
return this.deliverMessage(targetedMessage).catch(error => {
this.logger.warn(`Failed to deliver broadcast message to ${server.name}:`, error);
});
});
await Promise.all(promises);
}
private async deliverMessage(message: CrossServerMessage): Promise<void> {
if (message.target === 'broadcast') {
await this.handleBroadcast(message);
return;
}
// Find target endpoint
const endpoint = Array.from(this.endpoints.values())
.find(ep => ep.serverType === message.target);
if (!endpoint) {
throw new Error(`No endpoint found for target: ${message.target}`);
}
if (!endpoint.isActive) {
// Queue message for later delivery
this.queueMessage(message.target, message);
this.logger.warn(`Endpoint ${message.target} is inactive, message queued`);
return;
}
try {
// In a real implementation, this would make HTTP/WebSocket calls
// For now, we'll emit the message internally
this.emit('message', message);
this.logger.debug(`Message delivered to ${message.target}: ${message.id}`);
} catch (error) {
this.logger.error(`Failed to deliver message to ${message.target}:`, error);
// Retry logic
if (this.shouldRetry(message)) {
await this.retryMessage(message);
} else {
throw error;
}
}
}
private async handleBroadcast(message: CrossServerMessage): Promise<void> {
// Handle broadcast messages
this.logger.debug(`Processing broadcast message from ${message.source}`);
// Emit to all interested parties
this.emit('broadcast', message);
}
private handleMessage(message: CrossServerMessage): void {
this.logger.debug(`Received message from ${message.source}: ${message.type}`);
switch (message.type) {
case MessageType.REQUEST:
this.handleRequest(message);
break;
case MessageType.RESPONSE:
this.handleResponse(message);
break;
case MessageType.NOTIFICATION:
this.handleNotification(message);
break;
default:
this.logger.warn(`Unknown message type: ${message.type}`);
}
}
private handleRequest(message: CrossServerMessage): void {
// Handle request messages
this.emit('request', message);
}
private handleResponse(message: CrossServerMessage): void {
// Handle response messages
const response = message.payload as CrossServerResponse;
const handler = this.responseHandlers.get(response.correlationId);
if (handler) {
handler(response);
this.responseHandlers.delete(response.correlationId);
} else {
this.logger.warn(`No handler found for response: ${response.correlationId}`);
}
}
private handleNotification(message: CrossServerMessage): void {
// Handle notification messages
this.emit('notification', message);
}
private handleRegistration(registration: ServerRegistration): void {
this.logger.info(`New server registered: ${registration.name}`);
this.servers.set(registration.id, registration);
}
private handleHeartbeat(heartbeat: { serverId: string; timestamp: Date }): void {
const endpoint = this.endpoints.get(heartbeat.serverId);
if (endpoint) {
endpoint.lastHeartbeat = heartbeat.timestamp;
endpoint.isActive = true;
this.logger.debug(`Heartbeat received from ${heartbeat.serverId}`);
}
}
private handleDeregistration(deregistration: { serverId: string; serverName: string }): void {
this.logger.info(`Server deregistered: ${deregistration.serverName}`);
this.servers.delete(deregistration.serverId);
this.endpoints.delete(deregistration.serverId);
}
private startHeartbeatMonitoring(): void {
this.heartbeatInterval = setInterval(() => {
this.checkServerHealth();
}, this.config.heartbeatInterval);
}
private async checkServerHealth(): Promise<void> {
const now = new Date();
const staleThreshold = new Date(now.getTime() - (this.config.heartbeatInterval * 2));
for (const [serverId, endpoint] of this.endpoints) {
if (endpoint.lastHeartbeat < staleThreshold) {
this.logger.warn(`Server ${serverId} appears to be stale`);
endpoint.isActive = false;
// Attempt to reactivate
await this.attemptReactivation(serverId);
}
}
}
private async attemptReactivation(serverId: string): Promise<void> {
const endpoint = this.endpoints.get(serverId);
if (!endpoint) return;
try {
// Send heartbeat request
await this.sendMessage({
id: uuidv4(),
type: MessageType.HEARTBEAT,
source: ServerSource.UNIFIED,
target: endpoint.serverType,
timestamp: new Date(),
payload: { timestamp: new Date() },
priority: PriorityLevel.HIGH,
requiresResponse: true
});
this.logger.info(`Reactivation attempt sent to ${serverId}`);
} catch (error) {
this.logger.error(`Failed to reactivate server ${serverId}:`, error);
}
}
private startMessageProcessing(): void {
// Process queued messages periodically
setInterval(() => {
this.processQueuedMessages();
}, 5000); // Process every 5 seconds
}
private async processQueuedMessages(): Promise<void> {
for (const [target, messages] of this.messageQueue) {
if (messages.length === 0) continue;
const endpoint = this.endpoints.get(target);
if (endpoint && endpoint.isActive) {
const message = messages.shift();
if (message) {
try {
await this.deliverMessage(message);
this.logger.debug(`Queued message delivered to ${target}`);
} catch (error) {
this.logger.error(`Failed to deliver queued message to ${target}:`, error);
// Re-queue for later
messages.unshift(message);
}
}
}
}
}
private queueMessage(target: ServerSource | 'broadcast', message: CrossServerMessage): void {
if (!this.messageQueue.has(target)) {
this.messageQueue.set(target, []);
}
this.messageQueue.get(target)!.push(message);
}
private shouldRetry(message: CrossServerMessage): boolean {
// Implement retry logic based on message priority and type
return message.priority === PriorityLevel.CRITICAL ||
message.priority === PriorityLevel.HIGH;
}
private async retryMessage(message: CrossServerMessage): Promise<void> {
// Implement retry with exponential backoff
const retryCount = (message as CrossServerMessage & { retryCount?: number }).retryCount ?? 0;
if (retryCount >= this.config.maxRetries) {
this.logger.error(`Max retries exceeded for message: ${message.id}`);
return;
}
(message as CrossServerMessage & { retryCount?: number }).retryCount = retryCount + 1;
// Calculate delay with exponential backoff
const delay = Math.pow(2, retryCount) * 1000;
setTimeout(() => {
this.deliverMessage(message);
}, delay);
}
private validateRegistration(registration: ServerRegistration): void {
if (!registration.id || !registration.name || !registration.type) {
throw new Error('Invalid registration: missing required fields');
}
if (this.servers.has(registration.id)) {
throw new Error(`Server already registered: ${registration.id}`);
}
}
private validateMessage(message: CrossServerMessage): void {
if (!message.id || !message.type || !message.source || !message.target) {
throw new Error('Invalid message: missing required fields');
}
}
async getRegisteredServers(): Promise<ServerRegistration[]> {
return Array.from(this.servers.values());
}
async getServerCapabilities(serverType: ServerSource): Promise<ServerCapability[]> {
const server = Array.from(this.servers.values())
.find(s => s.type === serverType);
return server?.capabilities ?? [];
}
async isServerAvailable(serverType: ServerSource): Promise<boolean> {
const endpoint = Array.from(this.endpoints.values())
.find(ep => ep.serverType === serverType);
return endpoint?.isActive ?? false;
}
async getServerHealth(serverId: string): Promise<HealthStatus | null> {
const server = this.servers.get(serverId);
return server?.healthStatus ?? null;
}
async shutdown(): Promise<void> {
this.logger.info('Shutting down Cross-Server Communication Service');
// Stop heartbeat monitoring
if (this.heartbeatInterval) {
clearInterval(this.heartbeatInterval);
this.heartbeatInterval = null;
}
// Clear all handlers and queues
this.responseHandlers.clear();
this.messageQueue.clear();
// Notify all servers of shutdown
await this.broadcastMessage({
id: uuidv4(),
type: MessageType.NOTIFICATION,
source: ServerSource.UNIFIED,
target: 'broadcast',
timestamp: new Date(),
payload: { type: 'shutdown', message: 'Communication service shutting down' },
priority: PriorityLevel.HIGH
});
this.logger.info('Cross-Server Communication Service shutdown complete');
}
}