// Copyright 2025 Chris Bunting
// Brief: Inter-server communication protocols for MCP Code Analysis & Quality Server
// Scope: Communication layer for coordinating between analysis servers
import {
MCPRequest,
MCPResponse,
MCPError,
ServerMessage,
LoggerInterface
} from '@mcp-code-analysis/shared-types';
export interface CommunicationProtocol {
send(message: ServerMessage): Promise<void>;
receive(): Promise<ServerMessage>;
broadcast(message: ServerMessage): Promise<void>;
subscribe(event: string, handler: (message: ServerMessage) => void): void;
unsubscribe(event: string, handler: (message: ServerMessage) => void): void;
}
export interface ServerRegistry {
register(server: ServerInfo): Promise<void>;
unregister(serverId: string): Promise<void>;
getServer(serverId: string): Promise<ServerInfo | null>;
getAllServers(): Promise<ServerInfo[]>;
findServersByCapability(capability: string): Promise<ServerInfo[]>;
}
export interface ServerInfo {
id: string;
name: string;
version: string;
host: string;
port: number;
capabilities: string[];
status: ServerStatus;
lastSeen: Date;
metadata: Record<string, unknown>;
}
export enum ServerStatus {
ONLINE = 'online',
OFFLINE = 'offline',
BUSY = 'busy',
ERROR = 'error'
}
export interface MessageRouter {
route(request: MCPRequest): Promise<MCPResponse>;
registerHandler(method: string, handler: (request: MCPRequest) => Promise<MCPResponse>): void;
unregisterHandler(method: string): void;
getHandlers(): Map<string, (request: MCPRequest) => Promise<MCPResponse>>;
}
export interface EventBus {
emit(event: string, data: unknown): Promise<void>;
on(event: string, handler: (data: unknown) => void): void;
off(event: string, handler: (data: unknown) => void): void;
once(event: string, handler: (data: unknown) => void): void;
removeAllListeners(event?: string): void;
}
export interface LoadBalancer {
selectServer(servers: ServerInfo[]): ServerInfo;
updateServerLoad(serverId: string, load: number): void;
getServerLoad(serverId: string): number;
}
export class InMemoryCommunicationProtocol implements CommunicationProtocol {
private messageQueue: ServerMessage[] = [];
private eventHandlers: Map<string, ((message: ServerMessage) => void)[]> = new Map();
private logger: LoggerInterface;
constructor(logger: LoggerInterface) {
this.logger = logger;
}
async send(message: ServerMessage): Promise<void> {
this.logger.debug(`Sending message: ${message.type} - ${message.method ?? 'notification'}`);
this.messageQueue.push(message);
}
async receive(): Promise<ServerMessage> {
return new Promise((resolve) => {
const checkQueue = () => {
if (this.messageQueue.length > 0) {
resolve(this.messageQueue.shift()!);
} else {
setTimeout(checkQueue, 100);
}
};
checkQueue();
});
}
async broadcast(message: ServerMessage): Promise<void> {
this.logger.debug(`Broadcasting message: ${message.type}`);
// In a real implementation, this would send to all connected servers
this.messageQueue.push(message);
}
subscribe(event: string, handler: (message: ServerMessage) => void): void {
if (!this.eventHandlers.has(event)) {
this.eventHandlers.set(event, []);
}
this.eventHandlers.get(event)!.push(handler);
}
unsubscribe(event: string, handler: (message: ServerMessage) => void): void {
const handlers = this.eventHandlers.get(event);
if (handlers) {
const index = handlers.indexOf(handler);
if (index > -1) {
handlers.splice(index, 1);
}
}
}
}
export class InMemoryServerRegistry implements ServerRegistry {
private servers: Map<string, ServerInfo> = new Map();
private logger: LoggerInterface;
constructor(logger: LoggerInterface) {
this.logger = logger;
}
async register(server: ServerInfo): Promise<void> {
this.logger.info(`Registering server: ${server.name} (${server.id})`);
this.servers.set(server.id, { ...server, lastSeen: new Date() });
}
async unregister(serverId: string): Promise<void> {
this.logger.info(`Unregistering server: ${serverId}`);
this.servers.delete(serverId);
}
async getServer(serverId: string): Promise<ServerInfo | null> {
return this.servers.get(serverId) ?? null;
}
async getAllServers(): Promise<ServerInfo[]> {
return Array.from(this.servers.values());
}
async findServersByCapability(capability: string): Promise<ServerInfo[]> {
return Array.from(this.servers.values()).filter(server =>
server.capabilities.includes(capability)
);
}
}
export class DefaultMessageRouter implements MessageRouter {
private handlers: Map<string, (request: MCPRequest) => Promise<MCPResponse>> = new Map();
private logger: LoggerInterface;
constructor(logger: LoggerInterface) {
this.logger = logger;
}
async route(request: MCPRequest): Promise<MCPResponse> {
const handler = this.handlers.get(request.method);
if (!handler) {
const error: MCPError = {
code: -32601,
message: `Method not found: ${request.method}`
};
return {
id: request.id,
error,
timestamp: new Date()
};
}
try {
this.logger.debug(`Routing request: ${request.method}`);
return await handler(request);
} catch (err) {
const error: MCPError = {
code: -32603,
message: `Internal error: ${err instanceof Error ? err.message : 'Unknown error'}`
};
return {
id: request.id,
error,
timestamp: new Date()
};
}
}
registerHandler(method: string, handler: (request: MCPRequest) => Promise<MCPResponse>): void {
this.logger.debug(`Registering handler for method: ${method}`);
this.handlers.set(method, handler);
}
unregisterHandler(method: string): void {
this.logger.debug(`Unregistering handler for method: ${method}`);
this.handlers.delete(method);
}
getHandlers(): Map<string, (request: MCPRequest) => Promise<MCPResponse>> {
return new Map(this.handlers);
}
}
export class InMemoryEventBus implements EventBus {
private eventHandlers: Map<string, ((data: unknown) => void)[]> = new Map();
private logger: LoggerInterface;
constructor(logger: LoggerInterface) {
this.logger = logger;
}
async emit(event: string, data: unknown): Promise<void> {
this.logger.debug(`Emitting event: ${event}`);
const handlers = this.eventHandlers.get(event);
if (handlers) {
handlers.forEach(handler => {
try {
handler(data);
} catch (err) {
this.logger.error(`Error in event handler for ${event}:`, err);
}
});
}
}
on(event: string, handler: (data: unknown) => void): void {
if (!this.eventHandlers.has(event)) {
this.eventHandlers.set(event, []);
}
this.eventHandlers.get(event)!.push(handler);
}
off(event: string, handler: (data: unknown) => void): void {
const handlers = this.eventHandlers.get(event);
if (handlers) {
const index = handlers.indexOf(handler);
if (index > -1) {
handlers.splice(index, 1);
}
}
}
once(event: string, handler: (data: unknown) => void): void {
const onceHandler = (data: unknown) => {
handler(data);
this.off(event, onceHandler);
};
this.on(event, onceHandler);
}
removeAllListeners(event?: string): void {
if (event) {
this.eventHandlers.delete(event);
} else {
this.eventHandlers.clear();
}
}
}
export class RoundRobinLoadBalancer implements LoadBalancer {
private serverLoads: Map<string, number> = new Map();
private currentIndex = 0;
private logger: LoggerInterface;
constructor(logger: LoggerInterface) {
this.logger = logger;
}
selectServer(servers: ServerInfo[]): ServerInfo {
if (servers.length === 0) {
throw new Error('No servers available');
}
if (servers.length === 1) {
return servers[0];
}
// Filter out busy servers
const availableServers = servers.filter(server =>
server.status === ServerStatus.ONLINE
);
if (availableServers.length === 0) {
this.logger.warn('No available servers, returning first server');
return servers[0];
}
// Simple round-robin selection
const selectedServer = availableServers[this.currentIndex % availableServers.length];
this.currentIndex = (this.currentIndex + 1) % availableServers.length;
this.logger.debug(`Selected server: ${selectedServer.name} (${selectedServer.id})`);
return selectedServer;
}
updateServerLoad(serverId: string, load: number): void {
this.serverLoads.set(serverId, load);
this.logger.debug(`Updated load for server ${serverId}: ${load}`);
}
getServerLoad(serverId: string): number {
return this.serverLoads.get(serverId) ?? 0;
}
}
export interface CommunicationManager {
protocol: CommunicationProtocol;
registry: ServerRegistry;
router: MessageRouter;
eventBus: EventBus;
loadBalancer: LoadBalancer;
}
export function createCommunicationManager(
logger: LoggerInterface
): CommunicationManager {
return {
protocol: new InMemoryCommunicationProtocol(logger),
registry: new InMemoryServerRegistry(logger),
router: new DefaultMessageRouter(logger),
eventBus: new InMemoryEventBus(logger),
loadBalancer: new RoundRobinLoadBalancer(logger)
};
}
// Predefined events for inter-server communication
export const ServerEvents = {
SERVER_REGISTERED: 'server:registered',
SERVER_UNREGISTERED: 'server:unregistered',
SERVER_STATUS_CHANGED: 'server:status_changed',
ANALYSIS_COMPLETED: 'analysis:completed',
ANALYSIS_FAILED: 'analysis:failed',
DEPENDENCY_UPDATED: 'dependency:updated',
COMPLEXITY_CALCULATED: 'complexity:calculated',
CACHE_INVALIDATED: 'cache:invalidated',
CONFIG_UPDATED: 'config:updated'
} as const;
// Predefined methods for inter-server communication
export const ServerMethods = {
// Static Analysis methods
ANALYZE_FILE: 'static-analysis:analyze_file',
ANALYZE_PROJECT: 'static-analysis:analyze_project',
GET_ANALYSIS_RULES: 'static-analysis:get_rules',
CONFIGURE_ANALYZER: 'static-analysis:configure_analyzer',
// Dependency Analysis methods
ANALYZE_DEPENDENCIES: 'dependency-analysis:analyze_dependencies',
CHECK_UPDATES: 'dependency-analysis:check_updates',
FIND_CONFLICTS: 'dependency-analysis:find_conflicts',
SECURITY_AUDIT: 'dependency-analysis:security_audit',
// Complexity Analysis methods
ANALYZE_COMPLEXITY: 'complexity-analyzer:analyze_complexity',
SUGGEST_REFACTORINGS: 'complexity-analyzer:suggest_refactorings',
CALCULATE_METRICS: 'complexity-analyzer:calculate_metrics',
IDENTIFY_HOTSPOTS: 'complexity-analyzer:identify_hotspots',
// Common methods
PING: 'common:ping',
GET_STATUS: 'common:get_status',
GET_CAPABILITIES: 'common:get_capabilities',
RELOAD_CONFIG: 'common:reload_config'
} as const;