Skip to main content
Glama

CodeGraph CLI MCP Server

by Jakedismo
message-router.ts14 kB
import { EventEmitter } from 'events'; import { JsonRpcMessage } from '@modelcontextprotocol/sdk/types.js'; import { MessageValidator, ValidationResult } from '../validation/message-validator.js'; export interface RoutePattern { method: string; version?: string; priority?: number; conditions?: RouteCondition[]; } export interface RouteCondition { field: string; operator: 'eq' | 'ne' | 'gt' | 'lt' | 'in' | 'matches' | 'exists'; value: any; } export interface RouteHandler { handle(message: JsonRpcMessage, context: RouteContext): Promise<RouteResult>; canHandle?(message: JsonRpcMessage, context: RouteContext): boolean; } export interface RouteContext { sessionId: string; agentId?: string; metadata: Record<string, any>; timestamp: Date; correlationId?: string; } export interface RouteResult { success: boolean; response?: JsonRpcMessage; error?: string; forward?: ForwardInstruction; metadata?: Record<string, any>; } export interface ForwardInstruction { targetAgents?: string[]; targetSessions?: string[]; broadcast?: boolean; delay?: number; retries?: number; } export interface LoadBalancingStrategy { selectTarget(candidates: string[], context: RouteContext): string | null; } export interface FailoverStrategy { getAlternatives(primary: string, context: RouteContext): string[]; } export class MessageRouter extends EventEmitter { private routes = new Map<string, RouteHandler[]>(); private validator: MessageValidator; private loadBalancer?: LoadBalancingStrategy; private failoverStrategy?: FailoverStrategy; private metrics = { messagesRouted: 0, routingErrors: 0, averageLatency: 0, lastRouted: null as Date | null }; constructor(validator?: MessageValidator) { super(); this.validator = validator || new MessageValidator(); } public addRoute(pattern: RoutePattern, handler: RouteHandler): void { const key = this.createRouteKey(pattern); if (!this.routes.has(key)) { this.routes.set(key, []); } const handlers = this.routes.get(key)!; // Insert handler based on priority (higher priority first) const priority = pattern.priority || 0; const insertIndex = handlers.findIndex(h => (h as any).priority < priority); if (insertIndex === -1) { handlers.push(handler); } else { handlers.splice(insertIndex, 0, handler); } (handler as any).priority = priority; (handler as any).pattern = pattern; this.emit('route:added', { pattern, handler }); } public removeRoute(pattern: RoutePattern, handler?: RouteHandler): boolean { const key = this.createRouteKey(pattern); const handlers = this.routes.get(key); if (!handlers) return false; if (handler) { const index = handlers.indexOf(handler); if (index !== -1) { handlers.splice(index, 1); if (handlers.length === 0) { this.routes.delete(key); } this.emit('route:removed', { pattern, handler }); return true; } } else { // Remove all handlers for this pattern const removed = handlers.length; this.routes.delete(key); this.emit('route:removed', { pattern, count: removed }); return removed > 0; } return false; } public async route(message: JsonRpcMessage, context: RouteContext): Promise<RouteResult> { const startTime = Date.now(); try { // Validate message const validation = this.validator.validateMessage(message); if (!validation.valid) { this.metrics.routingErrors++; return { success: false, error: `Message validation failed: ${validation.errors?.join(', ')}`, response: this.createErrorResponse(message.id, -32602, 'Invalid params') }; } // Find matching handlers const handlers = this.findHandlers(message, context); if (handlers.length === 0) { this.emit('route:no_handler', { message, context }); return { success: false, error: `No handler found for method: ${message.method}`, response: this.createErrorResponse(message.id, -32601, 'Method not found') }; } // Try handlers in priority order for (const handler of handlers) { try { if (handler.canHandle && !handler.canHandle(message, context)) { continue; } const result = await handler.handle(message, context); if (result.success) { this.updateMetrics(startTime); this.emit('route:success', { message, context, result, handler }); return result; } else if (result.error) { this.emit('route:handler_error', { message, context, error: result.error, handler }); } } catch (error) { this.emit('route:handler_exception', { message, context, error, handler }); continue; // Try next handler } } this.metrics.routingErrors++; return { success: false, error: 'All handlers failed', response: this.createErrorResponse(message.id, -32603, 'Internal error') }; } catch (error) { this.metrics.routingErrors++; this.emit('route:error', { message, context, error }); return { success: false, error: `Routing failed: ${error}`, response: this.createErrorResponse(message.id, -32603, 'Internal error') }; } } public async routeWithLoadBalancing( message: JsonRpcMessage, context: RouteContext, candidates: string[] ): Promise<RouteResult> { if (!this.loadBalancer || candidates.length === 0) { return this.route(message, context); } const target = this.loadBalancer.selectTarget(candidates, context); if (!target) { return { success: false, error: 'Load balancer could not select target' }; } const targetContext = { ...context, targetAgent: target }; return this.route(message, targetContext); } public async routeWithFailover( message: JsonRpcMessage, context: RouteContext, primary: string ): Promise<RouteResult> { // Try primary target first const primaryContext = { ...context, targetAgent: primary }; const primaryResult = await this.route(message, primaryContext); if (primaryResult.success) { return primaryResult; } // Get alternatives from failover strategy if (!this.failoverStrategy) { return primaryResult; } const alternatives = this.failoverStrategy.getAlternatives(primary, context); for (const alternative of alternatives) { const altContext = { ...context, targetAgent: alternative }; const altResult = await this.route(message, altContext); if (altResult.success) { this.emit('route:failover_success', { message, context, primary, alternative, result: altResult }); return altResult; } } this.emit('route:failover_exhausted', { message, context, primary, alternatives }); return primaryResult; // Return original error } public async broadcastToGroup( group: string, message: JsonRpcMessage, context: RouteContext ): Promise<RouteResult[]> { const groupContext = { ...context, targetGroup: group }; // This would typically involve looking up group members // For now, we'll emit an event that the transport layer can handle this.emit('route:broadcast', { group, message, context: groupContext }); return [{ success: true, metadata: { broadcast: true, group } }]; } private findHandlers(message: JsonRpcMessage, context: RouteContext): RouteHandler[] { const handlers: RouteHandler[] = []; for (const [routeKey, routeHandlers] of this.routes) { if (this.matchesRoute(routeKey, message, context)) { handlers.push(...routeHandlers); } } // Sort by priority (higher first) return handlers.sort((a, b) => ((b as any).priority || 0) - ((a as any).priority || 0)); } private matchesRoute(routeKey: string, message: JsonRpcMessage, context: RouteContext): boolean { const [method, version] = routeKey.split('@'); if (method !== '*' && method !== message.method) { return false; } if (version && version !== '*') { // Version matching logic would go here // For now, we'll assume no version means any version } // Get the pattern conditions for additional matching const routeHandlers = this.routes.get(routeKey); if (!routeHandlers || routeHandlers.length === 0) { return false; } const pattern = (routeHandlers[0] as any).pattern as RoutePattern; if (pattern?.conditions) { return this.evaluateConditions(pattern.conditions, message, context); } return true; } private evaluateConditions( conditions: RouteCondition[], message: JsonRpcMessage, context: RouteContext ): boolean { for (const condition of conditions) { if (!this.evaluateCondition(condition, message, context)) { return false; } } return true; } private evaluateCondition( condition: RouteCondition, message: JsonRpcMessage, context: RouteContext ): boolean { const value = this.getFieldValue(condition.field, message, context); switch (condition.operator) { case 'eq': return value === condition.value; case 'ne': return value !== condition.value; case 'gt': return value > condition.value; case 'lt': return value < condition.value; case 'in': return Array.isArray(condition.value) && condition.value.includes(value); case 'matches': return typeof value === 'string' && new RegExp(condition.value).test(value); case 'exists': return value !== undefined && value !== null; default: return false; } } private getFieldValue(field: string, message: JsonRpcMessage, context: RouteContext): any { const parts = field.split('.'); let current: any; if (parts[0] === 'message') { current = message; parts.shift(); } else if (parts[0] === 'context') { current = context; parts.shift(); } else { current = message; } for (const part of parts) { if (current && typeof current === 'object') { current = current[part]; } else { return undefined; } } return current; } private createRouteKey(pattern: RoutePattern): string { return `${pattern.method}@${pattern.version || '*'}`; } private createErrorResponse(id: any, code: number, message: string): JsonRpcMessage { return { jsonrpc: '2.0', error: { code, message }, id }; } private updateMetrics(startTime: number): void { const latency = Date.now() - startTime; this.metrics.messagesRouted++; this.metrics.lastRouted = new Date(); // Simple moving average for latency this.metrics.averageLatency = (this.metrics.averageLatency * 0.9) + (latency * 0.1); } public setLoadBalancer(strategy: LoadBalancingStrategy): void { this.loadBalancer = strategy; } public setFailoverStrategy(strategy: FailoverStrategy): void { this.failoverStrategy = strategy; } public getRoutes(): Array<{ pattern: RoutePattern; handlers: number }> { const routes: Array<{ pattern: RoutePattern; handlers: number }> = []; for (const [key, handlers] of this.routes) { if (handlers.length > 0) { const pattern = (handlers[0] as any).pattern as RoutePattern; routes.push({ pattern, handlers: handlers.length }); } } return routes; } public getMetrics() { return { ...this.metrics }; } public clearRoutes(): void { this.routes.clear(); this.emit('routes:cleared'); } public getHandlerCount(): number { let count = 0; for (const handlers of this.routes.values()) { count += handlers.length; } return count; } } // Built-in load balancing strategies export class RoundRobinLoadBalancer implements LoadBalancingStrategy { private currentIndex = 0; selectTarget(candidates: string[], context: RouteContext): string | null { if (candidates.length === 0) return null; const target = candidates[this.currentIndex % candidates.length]; this.currentIndex = (this.currentIndex + 1) % candidates.length; return target; } } export class RandomLoadBalancer implements LoadBalancingStrategy { selectTarget(candidates: string[], context: RouteContext): string | null { if (candidates.length === 0) return null; const index = Math.floor(Math.random() * candidates.length); return candidates[index]; } } export class WeightedLoadBalancer implements LoadBalancingStrategy { constructor(private weights: Map<string, number> = new Map()) {} selectTarget(candidates: string[], context: RouteContext): string | null { if (candidates.length === 0) return null; // If no weights configured, use random selection if (this.weights.size === 0) { return candidates[Math.floor(Math.random() * candidates.length)]; } const totalWeight = candidates.reduce((sum, candidate) => sum + (this.weights.get(candidate) || 1), 0 ); let random = Math.random() * totalWeight; for (const candidate of candidates) { const weight = this.weights.get(candidate) || 1; random -= weight; if (random <= 0) { return candidate; } } return candidates[0]; // Fallback } setWeight(agent: string, weight: number): void { this.weights.set(agent, Math.max(0, weight)); } }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Jakedismo/codegraph-rust'

If you have feedback or need assistance with the MCP directory API, please join our Discord server