Skip to main content
Glama
cbunting99

MCP Code Analysis & Quality Server

by cbunting99
CrossServerCommunicationService.ts16 kB
// Copyright 2025 Chris Bunting // Brief: Cross-server communication service for MCP Code Analysis & Quality Server // Scope: Handles communication between Static Analysis, Dependency Analysis, and Code Complexity Analyzer servers import { EventEmitter } from 'events'; import { v4 as uuidv4 } from 'uuid'; import { CrossServerMessage, CrossServerResponse, ServerRegistration, ServerCapability, HealthStatus, MessageType, ServerSource, PriorityLevel, LoggerInterface } from '@mcp-code-analysis/shared-types'; export interface CommunicationServiceConfig { heartbeatInterval: number; responseTimeout: number; maxRetries: number; enableBroadcast: boolean; enableCompression: boolean; } export interface ServerEndpoint { id: string; url: string; serverType: ServerSource; isActive: boolean; lastHeartbeat: Date; capabilities: ServerCapability[]; } export class CrossServerCommunicationService extends EventEmitter { private servers: Map<string, ServerRegistration> = new Map(); private endpoints: Map<string, ServerEndpoint> = new Map(); private messageQueue: Map<string, CrossServerMessage[]> = new Map(); private responseHandlers: Map<string, (response: CrossServerResponse) => void> = new Map(); private heartbeatInterval: ReturnType<typeof setInterval> | null = null; private config: CommunicationServiceConfig; private logger: LoggerInterface; constructor(config: CommunicationServiceConfig, logger: LoggerInterface) { super(); this.config = config; this.logger = logger; this.setupEventHandlers(); } private setupEventHandlers(): void { this.on('message', this.handleMessage.bind(this)); this.on('registration', this.handleRegistration.bind(this)); this.on('heartbeat', this.handleHeartbeat.bind(this)); this.on('deregistration', this.handleDeregistration.bind(this)); } async initialize(): Promise<void> { this.logger.info('Initializing Cross-Server Communication Service'); // Start heartbeat monitoring this.startHeartbeatMonitoring(); // Initialize message processing this.startMessageProcessing(); this.logger.info('Cross-Server Communication Service initialized successfully'); } async registerServer(registration: ServerRegistration): Promise<void> { try { this.logger.info(`Registering server: ${registration.name} (${registration.type})`); // Validate registration this.validateRegistration(registration); // Store server registration this.servers.set(registration.id, registration); // Create endpoint if URL provided if (registration.endpoint) { const endpoint: ServerEndpoint = { id: registration.id, url: registration.endpoint, serverType: registration.type, isActive: true, lastHeartbeat: new Date(), capabilities: registration.capabilities }; this.endpoints.set(registration.id, endpoint); } // Emit registration event this.emit('registration', registration); // Notify other servers await this.broadcastMessage({ id: uuidv4(), type: MessageType.REGISTRATION, source: ServerSource.UNIFIED, target: 'broadcast', timestamp: new Date(), payload: registration, priority: PriorityLevel.MEDIUM }); this.logger.info(`Server registered successfully: ${registration.name}`); } catch (error) { this.logger.error(`Failed to register server ${registration.name}:`, error); throw error; } } async deregisterServer(serverId: string): Promise<void> { try { const server = this.servers.get(serverId); if (!server) { throw new Error(`Server not found: ${serverId}`); } this.logger.info(`Deregistering server: ${server.name} (${server.type})`); // Remove server registration this.servers.delete(serverId); // Remove endpoint this.endpoints.delete(serverId); // Clear message queue for this server this.messageQueue.delete(serverId); // Emit deregistration event this.emit('deregistration', server); // Notify other servers await this.broadcastMessage({ id: uuidv4(), type: MessageType.DEREGISTRATION, source: ServerSource.UNIFIED, target: 'broadcast', timestamp: new Date(), payload: { serverId, serverName: server.name }, priority: PriorityLevel.MEDIUM }); this.logger.info(`Server deregistered successfully: ${server.name}`); } catch (error) { this.logger.error(`Failed to deregister server ${serverId}:`, error); throw error; } } async sendMessage(message: CrossServerMessage): Promise<CrossServerResponse> { try { this.logger.debug(`Sending message from ${message.source} to ${message.target}`); // Validate message this.validateMessage(message); // If message requires response, set up handler if (message.requiresResponse) { return new Promise((resolve, reject) => { const timeout = setTimeout(() => { this.responseHandlers.delete(message.id); reject(new Error(`Message response timeout: ${message.id}`)); }, this.config.responseTimeout); this.responseHandlers.set(message.id, (response: CrossServerResponse) => { clearTimeout(timeout); this.responseHandlers.delete(message.id); resolve(response); }); // Send message this.deliverMessage(message); }); } else { // Fire and forget await this.deliverMessage(message); return { id: uuidv4(), correlationId: message.id, source: ServerSource.UNIFIED, target: message.source as ServerSource, timestamp: new Date(), success: true, processingTime: 0 }; } } catch (error) { this.logger.error('Failed to send message:', error); throw error; } } async broadcastMessage(message: CrossServerMessage): Promise<void> { if (!this.config.enableBroadcast) { this.logger.warn('Broadcast is disabled'); return; } message.target = 'broadcast'; // Send to all registered servers except the source const promises = Array.from(this.servers.values()) .filter(server => server.type !== message.source) .map(server => { const targetedMessage = { ...message, target: server.type }; return this.deliverMessage(targetedMessage).catch(error => { this.logger.warn(`Failed to deliver broadcast message to ${server.name}:`, error); }); }); await Promise.all(promises); } private async deliverMessage(message: CrossServerMessage): Promise<void> { if (message.target === 'broadcast') { await this.handleBroadcast(message); return; } // Find target endpoint const endpoint = Array.from(this.endpoints.values()) .find(ep => ep.serverType === message.target); if (!endpoint) { throw new Error(`No endpoint found for target: ${message.target}`); } if (!endpoint.isActive) { // Queue message for later delivery this.queueMessage(message.target, message); this.logger.warn(`Endpoint ${message.target} is inactive, message queued`); return; } try { // In a real implementation, this would make HTTP/WebSocket calls // For now, we'll emit the message internally this.emit('message', message); this.logger.debug(`Message delivered to ${message.target}: ${message.id}`); } catch (error) { this.logger.error(`Failed to deliver message to ${message.target}:`, error); // Retry logic if (this.shouldRetry(message)) { await this.retryMessage(message); } else { throw error; } } } private async handleBroadcast(message: CrossServerMessage): Promise<void> { // Handle broadcast messages this.logger.debug(`Processing broadcast message from ${message.source}`); // Emit to all interested parties this.emit('broadcast', message); } private handleMessage(message: CrossServerMessage): void { this.logger.debug(`Received message from ${message.source}: ${message.type}`); switch (message.type) { case MessageType.REQUEST: this.handleRequest(message); break; case MessageType.RESPONSE: this.handleResponse(message); break; case MessageType.NOTIFICATION: this.handleNotification(message); break; default: this.logger.warn(`Unknown message type: ${message.type}`); } } private handleRequest(message: CrossServerMessage): void { // Handle request messages this.emit('request', message); } private handleResponse(message: CrossServerMessage): void { // Handle response messages const response = message.payload as CrossServerResponse; const handler = this.responseHandlers.get(response.correlationId); if (handler) { handler(response); this.responseHandlers.delete(response.correlationId); } else { this.logger.warn(`No handler found for response: ${response.correlationId}`); } } private handleNotification(message: CrossServerMessage): void { // Handle notification messages this.emit('notification', message); } private handleRegistration(registration: ServerRegistration): void { this.logger.info(`New server registered: ${registration.name}`); this.servers.set(registration.id, registration); } private handleHeartbeat(heartbeat: { serverId: string; timestamp: Date }): void { const endpoint = this.endpoints.get(heartbeat.serverId); if (endpoint) { endpoint.lastHeartbeat = heartbeat.timestamp; endpoint.isActive = true; this.logger.debug(`Heartbeat received from ${heartbeat.serverId}`); } } private handleDeregistration(deregistration: { serverId: string; serverName: string }): void { this.logger.info(`Server deregistered: ${deregistration.serverName}`); this.servers.delete(deregistration.serverId); this.endpoints.delete(deregistration.serverId); } private startHeartbeatMonitoring(): void { this.heartbeatInterval = setInterval(() => { this.checkServerHealth(); }, this.config.heartbeatInterval); } private async checkServerHealth(): Promise<void> { const now = new Date(); const staleThreshold = new Date(now.getTime() - (this.config.heartbeatInterval * 2)); for (const [serverId, endpoint] of this.endpoints) { if (endpoint.lastHeartbeat < staleThreshold) { this.logger.warn(`Server ${serverId} appears to be stale`); endpoint.isActive = false; // Attempt to reactivate await this.attemptReactivation(serverId); } } } private async attemptReactivation(serverId: string): Promise<void> { const endpoint = this.endpoints.get(serverId); if (!endpoint) return; try { // Send heartbeat request await this.sendMessage({ id: uuidv4(), type: MessageType.HEARTBEAT, source: ServerSource.UNIFIED, target: endpoint.serverType, timestamp: new Date(), payload: { timestamp: new Date() }, priority: PriorityLevel.HIGH, requiresResponse: true }); this.logger.info(`Reactivation attempt sent to ${serverId}`); } catch (error) { this.logger.error(`Failed to reactivate server ${serverId}:`, error); } } private startMessageProcessing(): void { // Process queued messages periodically setInterval(() => { this.processQueuedMessages(); }, 5000); // Process every 5 seconds } private async processQueuedMessages(): Promise<void> { for (const [target, messages] of this.messageQueue) { if (messages.length === 0) continue; const endpoint = this.endpoints.get(target); if (endpoint && endpoint.isActive) { const message = messages.shift(); if (message) { try { await this.deliverMessage(message); this.logger.debug(`Queued message delivered to ${target}`); } catch (error) { this.logger.error(`Failed to deliver queued message to ${target}:`, error); // Re-queue for later messages.unshift(message); } } } } } private queueMessage(target: ServerSource | 'broadcast', message: CrossServerMessage): void { if (!this.messageQueue.has(target)) { this.messageQueue.set(target, []); } this.messageQueue.get(target)!.push(message); } private shouldRetry(message: CrossServerMessage): boolean { // Implement retry logic based on message priority and type return message.priority === PriorityLevel.CRITICAL || message.priority === PriorityLevel.HIGH; } private async retryMessage(message: CrossServerMessage): Promise<void> { // Implement retry with exponential backoff const retryCount = (message as CrossServerMessage & { retryCount?: number }).retryCount ?? 0; if (retryCount >= this.config.maxRetries) { this.logger.error(`Max retries exceeded for message: ${message.id}`); return; } (message as CrossServerMessage & { retryCount?: number }).retryCount = retryCount + 1; // Calculate delay with exponential backoff const delay = Math.pow(2, retryCount) * 1000; setTimeout(() => { this.deliverMessage(message); }, delay); } private validateRegistration(registration: ServerRegistration): void { if (!registration.id || !registration.name || !registration.type) { throw new Error('Invalid registration: missing required fields'); } if (this.servers.has(registration.id)) { throw new Error(`Server already registered: ${registration.id}`); } } private validateMessage(message: CrossServerMessage): void { if (!message.id || !message.type || !message.source || !message.target) { throw new Error('Invalid message: missing required fields'); } } async getRegisteredServers(): Promise<ServerRegistration[]> { return Array.from(this.servers.values()); } async getServerCapabilities(serverType: ServerSource): Promise<ServerCapability[]> { const server = Array.from(this.servers.values()) .find(s => s.type === serverType); return server?.capabilities ?? []; } async isServerAvailable(serverType: ServerSource): Promise<boolean> { const endpoint = Array.from(this.endpoints.values()) .find(ep => ep.serverType === serverType); return endpoint?.isActive ?? false; } async getServerHealth(serverId: string): Promise<HealthStatus | null> { const server = this.servers.get(serverId); return server?.healthStatus ?? null; } async shutdown(): Promise<void> { this.logger.info('Shutting down Cross-Server Communication Service'); // Stop heartbeat monitoring if (this.heartbeatInterval) { clearInterval(this.heartbeatInterval); this.heartbeatInterval = null; } // Clear all handlers and queues this.responseHandlers.clear(); this.messageQueue.clear(); // Notify all servers of shutdown await this.broadcastMessage({ id: uuidv4(), type: MessageType.NOTIFICATION, source: ServerSource.UNIFIED, target: 'broadcast', timestamp: new Date(), payload: { type: 'shutdown', message: 'Communication service shutting down' }, priority: PriorityLevel.HIGH }); this.logger.info('Cross-Server Communication Service shutdown complete'); } }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/cbunting99/mcp-code-analysis-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server