Skip to main content
Glama
setup-manager.ts10.3 kB
import { EventEmitter } from "events"; import { indexBy, stringifyEq } from "@mcpx/toolkit-core/data"; import { loggableError } from "@mcpx/toolkit-core/logging"; import { McpxBoundPayloads, targetServerSchema, WebappBoundPayloadOf, } from "@mcpx/webapp-protocol/messages"; import { Logger } from "winston"; import z from "zod/v4"; import { ConfigService } from "../config.js"; import { Config } from "../model/config/config.js"; import { ServiceToolGroup } from "../model/config/permissions.js"; import { TargetServer } from "../model/target-servers.js"; import { TargetClients } from "./target-clients.js"; type ApplySetupPayload = z.infer<typeof McpxBoundPayloads.applySetup>; type SetupConfigPayload = ApplySetupPayload["config"]; export interface SetupManagerI { applySetup( payload: ApplySetupPayload, ): Promise<WebappBoundPayloadOf<"setup-change">>; isDigesting(): boolean; buildUserConfigChangePayload( config: Config, ): WebappBoundPayloadOf<"setup-change"> | null; buildUserTargetServersChangePayload( servers: TargetServer[], ): WebappBoundPayloadOf<"setup-change"> | null; } export class SetupManager implements SetupManagerI { private logger: Logger; private _isDigesting = false; private digestCompleteEmitter = new EventEmitter<{ complete: [] }>(); private currentSetup: Omit< WebappBoundPayloadOf<"setup-change">, "source" > | null = null; constructor( private targetClients: TargetClients, private configService: ConfigService, logger: Logger, ) { this.logger = logger.child({ component: "SetupManager" }); } isDigesting(): boolean { return this._isDigesting; } // Hook to run when user-initiated target-servers change occurs buildUserTargetServersChangePayload( servers: TargetServer[], ): WebappBoundPayloadOf<"setup-change"> | null { // Build new setup payload const targetServerRecord = indexBy(servers, (ts) => ts.name); // Check if changed if (stringifyEq(this.currentSetup?.targetServers, targetServerRecord)) { return null; } // Update state this.currentSetup = { targetServers: targetServerRecord, config: this.currentSetup?.config ?? this.getDefaultConfig(), }; return { source: "user", ...this.currentSetup, }; } buildUserConfigChangePayload( config: Config, ): WebappBoundPayloadOf<"setup-change"> | null { // Convert config to protocol format const normalizedConfig = this.normalizeConfig(config); // Check if changed if (stringifyEq(this.currentSetup?.config, normalizedConfig)) { return null; } // Update state this.currentSetup = { targetServers: this.currentSetup?.targetServers ?? {}, config: normalizedConfig, }; return { source: "user", ...this.currentSetup, }; } async applySetup( payload: ApplySetupPayload, ): Promise<WebappBoundPayloadOf<"setup-change">> { this.logger.info("Applying setup from Hub", { source: payload.source, setupId: payload.setupId, }); // Wait for any ongoing digest to complete if (this._isDigesting) { this.logger.info("Waiting for ongoing digest to complete..."); await new Promise<void>((resolve) => this.digestCompleteEmitter.once("complete", resolve), ); } // Enter digest mode this._isDigesting = true; // Anchor current state (to be restored in case of failure) const currentTargetServers = indexBy( this.targetClients.servers, (ts) => ts.name, ); const currentConfig = this.normalizeConfig(this.configService.getConfig()); try { await this.applyTargetServers(payload.targetServers); await this.applyConfig(payload.config); this.logger.info("Successfully applied setup", { source: payload.source, setupId: payload.setupId, }); // Exit digest mode and build consolidated payload this._isDigesting = false; this.digestCompleteEmitter.emit("complete"); return this.buildDigestedSetupPayload(); } catch (e) { this.logger.error("Failed to apply setup", { source: payload.source, setupId: payload.setupId, error: loggableError(e), }); await this.rollbackSetup(currentTargetServers, currentConfig); // Clean up digest state on failure this._isDigesting = false; this.digestCompleteEmitter.emit("complete"); throw e; } } // Rollback philosophy is that we try to restore previous state as best as we can, // but if we fail to restore either target-servers or config, we log the error and move on. // We do not attempt to retry restoration or fail the entire process, as that could lead to // infinite loops or other complications. The goal is to be resilient and maintain service availability, // even if the exact previous state cannot be fully restored. private async rollbackSetup( targetServers: Record<string, z.infer<typeof targetServerSchema>>, config: SetupConfigPayload, ): Promise<void> { // Attempt to restore previous state try { this.logger.info("Restoring previous target servers", { count: Object.keys(targetServers).length, }); await this.applyTargetServers(targetServers); } catch (restoreError) { this.logger.error( "Failed to restore target servers after setup failure", { error: loggableError(restoreError), }, ); } try { this.logger.info("Restoring previous config"); await this.applyConfig(config); } catch (restoreError) { this.logger.error("Failed to restore config after setup failure", { error: loggableError(restoreError), }); } // Update currentSetup to reflect actual state after restoration attempt this.currentSetup = { targetServers: indexBy(this.targetClients.servers, (ts) => ts.name), config: this.normalizeConfig(this.configService.getConfig()), }; } private buildDigestedSetupPayload(): WebappBoundPayloadOf<"setup-change"> { this.logger.info("Building digested setup payload"); // Get current state (reflects actual applied state) const currentTargetServers = indexBy( this.targetClients.servers, (ts) => ts.name, ); const currentConfig = this.normalizeConfig(this.configService.getConfig()); // Update state this.currentSetup = { targetServers: currentTargetServers, config: currentConfig, }; // Return payload with source="hub" (Hub-initiated change) return { source: "hub", ...this.currentSetup, }; } private async applyTargetServers( targetServersPayload: Record<string, z.infer<typeof targetServerSchema>>, ): Promise<void> { // Convert payload to TargetServer[] const incomingServers: TargetServer[] = Object.entries( targetServersPayload, ).map(([name, config]) => ({ name, ...config })); this.logger.info("Applying target servers", { count: incomingServers.length, }); // Get current server names const currentServerNames = Array.from( this.targetClients.clientsByService.keys(), ); // Remove all current servers this.logger.debug("Removing current target servers", { count: currentServerNames.length, }); await Promise.all( currentServerNames.map((name) => this.targetClients.removeClient(name)), ); // Add all incoming servers this.logger.debug("Adding incoming target servers", { count: incomingServers.length, }); const results = await Promise.allSettled( incomingServers.map((server) => this.targetClients.addClient(server)), ); const failures = results .map((r, i) => ({ result: r, server: incomingServers[i] })) .filter(({ result }) => result.status === "rejected"); if (failures.length > 0) { const failedServers = failures.map(({ server }) => server?.name); this.logger.error("Failed to add some target servers", { failedServers, failureCount: failures.length, totalCount: incomingServers.length, }); throw new Error( `Failed to add ${failures.length}/${incomingServers.length} target servers: ${failedServers.join(", ")}`, ); } this.logger.info("Target servers applied successfully"); } private async applyConfig(incomingConfig: SetupConfigPayload): Promise<void> { this.logger.info("Applying config"); await this.configService.withLock(async () => { await this.configService.updateConfig(fillInConfig(incomingConfig)); }); this.logger.info("Config applied successfully"); } private normalizeConfig(config: Config): SetupConfigPayload { const normalizedToolGroups = config.toolGroups?.map((toolGroup) => ({ name: toolGroup.name, services: Object.fromEntries( Object.entries(toolGroup.services).map(([serviceName, markedTools]) => [ serviceName, this.normalizeMarkedTools(serviceName, markedTools), ]), ), })); return { ...config, toolGroups: normalizedToolGroups, }; } private normalizeMarkedTools( serviceName: string, markedTools: ServiceToolGroup, ): string[] { if (markedTools === "*") { this.logger.warn( "'*' found as marked tools, expansion will be implemented later, returning empty list for now", { serviceName }, ); return []; } return markedTools; } private getDefaultConfig(): NonNullable<SetupConfigPayload> { return { toolGroups: [], toolExtensions: { services: {} }, staticOauth: undefined, permissions: { default: { block: [] }, consumers: {} }, auth: { enabled: false }, targetServerAttributes: {}, }; } } function fillInConfig(partialConfig: SetupConfigPayload): Config { return { toolGroups: partialConfig.toolGroups ?? [], toolExtensions: partialConfig.toolExtensions ?? { services: {} }, staticOauth: partialConfig.staticOauth, permissions: partialConfig.permissions ?? { default: { block: [] }, consumers: {}, }, auth: partialConfig.auth ?? { enabled: false }, targetServerAttributes: partialConfig.targetServerAttributes ?? {}, }; }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/TheLunarCompany/lunar'

If you have feedback or need assistance with the MCP directory API, please join our Discord server