Skip to main content
Glama
target-clients.ts17.4 kB
import { makeError } from "@mcpx/toolkit-core/data"; import { loggableError, LunarLogger } from "@mcpx/toolkit-core/logging"; import { env } from "../env.js"; import { AlreadyExistsError, FailedToConnectToTargetServer, InvalidSchemaError, NotFoundError, } from "../errors.js"; import { LOG_FLAGS } from "../log-flags.js"; import { RemoteTargetServer, TargetServer } from "../model/target-servers.js"; import { ExtendedClientI } from "./client-extension.js"; import { sanitizeTargetServerForTelemetry } from "./control-plane-service.js"; import { isAuthenticationError, OAuthConnectionHandler, } from "./oauth-connection-handler.js"; import { ServerConfigManager } from "./server-config-manager.js"; import { SystemStateTracker, TargetServerNewWithoutUsage, } from "./system-state.js"; import { buildClient, TargetServerConnectionFactory, } from "./target-server-connection-factory.js"; interface ConnectedTargetClient { _state: "connected"; targetServer: TargetServer; extendedClient: ExtendedClientI; } interface PendingAuthTargetClient { _state: "pending-auth"; targetServer: RemoteTargetServer; } interface ConnectionFailedTargetClient { _state: "connection-failed"; targetServer: TargetServer; error: Error; } type TargetClient = | ConnectedTargetClient | PendingAuthTargetClient | ConnectionFailedTargetClient; // This class manages connections to target MCP servers, via initializing // `Client` instances, extending them into `ExtendedClient` instances, // storing them in a map, and providing methods to add, remove, and list clients. export class TargetClients { private _clientsByService: Map<string, TargetClient> = new Map(); private targetServers: TargetServer[] = []; private initialized = false; private postChangeHook: ((servers: TargetServer[]) => void) | null = null; constructor( private systemState: SystemStateTracker, private serverConfigManager: ServerConfigManager, private connectionFactory: TargetServerConnectionFactory, private oauthConnectionHandler: OAuthConnectionHandler, private logger: LunarLogger, ) { this.logger = logger.child({ component: "TargetClients" }); } get servers(): TargetServer[] { return this.targetServers; } // TODO: Support adding multiple hooks registerPostChangeHook(hook: (servers: TargetServer[]) => void): void { this.postChangeHook = hook; } async initialize(): Promise<void> { if (env.READ_TARGET_SERVERS_FROM_FILE) { try { this.targetServers = this.serverConfigManager.readTargetServers(); if (this.targetServers.length > 0) { const telemetryServers = Object.fromEntries( this.targetServers.map((server) => [ server.name, sanitizeTargetServerForTelemetry(server), ]), ); this.logger.telemetry.info("target servers loaded", { mcpServers: telemetryServers, }); } // Clear any previous config error on successful load this.systemState.clearConfigError(); } catch (e) { if (e instanceof InvalidSchemaError) { // Log the error and set config error in system state this.logger.error("Configuration validation failed", { error: e.message, }); this.systemState.setConfigError(e.message); this.targetServers = []; } } } this.logger.info("Initializing TargetClients with servers", { count: this.targetServers.length, }); await this.reloadClients(); this.logger.info("TargetClients initialized", { count: this._clientsByService.size, }); this.initialized = true; } get clientsByService(): Map<string, TargetClient> { if (!this.initialized) { throw new Error("TargetClients not initialized"); } return this._clientsByService; } get connectedClientsByService(): Map<string, ExtendedClientI> { if (!this.initialized) { throw new Error("TargetClients not initialized"); } return this._connectedClientsByService(); } private _connectedClientsByService(): Map<string, ExtendedClientI> { const connectedClients = new Map<string, ExtendedClientI>(); for (const [serviceName, client] of this._clientsByService.entries()) { if (client._state === "connected") { connectedClients.set(serviceName, client.extendedClient); } } return connectedClients; } async shutdown(): Promise<void> { this.logger.info("Shutting down TargetClients..."); for (const [name, client] of this._connectedClientsByService()) { try { await client.close(); this.logger.info("Client closed", { name }); } catch (e: unknown) { const error = loggableError(e); this.logger.error("Error closing client", { name, error }); } } } getTargetServer(name: string): TargetServer | undefined { if (!this.initialized) { throw new Error("TargetClients not initialized"); } return this.targetServers.find((server) => server.name === name); } async removeClient(name: string): Promise<void> { const client = this._clientsByService.get(name); if (!client) { this.logger.warn("Client not found", { name }); return Promise.reject(new NotFoundError()); } try { if (client._state === "connected") { await client.extendedClient.close(); } // Remove from targetServers and persist this.targetServers = this.targetServers.filter( (server) => server.name !== name, ); this.serverConfigManager.writeTargetServers(this.targetServers); this.recordClientRemoved(name); this.logger.info("Client removed", { name }); } catch (e: unknown) { const error = loggableError(e); this.logger.error("Error removing client", { name, error }); } } async addClient(targetServer: TargetServer): Promise<void> { if (this._clientsByService.has(targetServer.name)) { this.logger.warn("Client name already exists", { name: targetServer.name, }); return Promise.reject(new AlreadyExistsError()); } // Add to targetServers this.targetServers.push(targetServer); const client = await this.safeInitiateClient(targetServer); this.serverConfigManager.writeTargetServers(this.targetServers); await this.recordClientUpsert(client); this.logger.info("Client added", { name: targetServer.name }); } async reloadClients(): Promise<void> { // Disconnect all clients before reloading await Promise.all( Array.from(this._connectedClientsByService().entries()).map( ([name, client]) => { return client .close() .then(() => { this.recordClientRemoved(name); this.logger.info("Client closed", { name }); }) .catch((e: unknown) => { const error = loggableError(e); this.logger.error("Error closing client", { name, error, }); }); }, ), ); if (this._clientsByService.size !== 0) { this.logger.warn("Some clients were not closed properly", { clients: Array.from(this._clientsByService.keys()), }); this._clientsByService.clear(); } // Reconnect to all target servers await Promise.all( this.targetServers.map(async (server) => { const client = await this.safeInitiateClient(server); if (!client) return; await this.recordClientUpsert(client); }), ); } // A method to reuse existing OAuth tokens if available. // Will throw an error if the target server is not in pendingAuth state or // if reusing tokens fails (missing/rejected tokens) async reuseOAuthByName(targetServerName: string): Promise<ExtendedClientI> { const pendingAuth = await this.getPendingAuthClient(targetServerName); return await this.reuseOAuth(pendingAuth); } async initiateOAuth(targetServerName: string): Promise<void> { const pendingAuth = await this.getPendingAuthClient(targetServerName); const client = buildClient(targetServerName); try { const extendedClient = await this.oauthConnectionHandler.tryWithOAuth( pendingAuth.targetServer, client, ); this.logger.info("OAuth connection established", { targetServerName, }); const tools = await extendedClient.listTools(); if (LOG_FLAGS.LOG_DETAILED_TOOL_LISTINGS) { this.logger.debug("Available tools", { tools }); } else { this.logger.debug("Available tools names", { toolNames: tools.tools.map((tool) => tool.name), }); } // Update the clientsByService map - this will replace the pendingAuth entry const newTargetClient: TargetClient = { _state: "connected" as const, targetServer: pendingAuth.targetServer, extendedClient, }; await this.recordClientUpsert(newTargetClient); return; } catch (e) { const error = makeError(e); this.logger.error("Failed to initiate OAuth", { targetServerName, error: loggableError(error), }); const newTargetClient = { _state: "connection-failed" as const, targetServer: pendingAuth.targetServer, error, }; await this.recordClientUpsert(newTargetClient); return Promise.reject(error); } } // An internal method that reuses OAuth tokens for a given pendingAuth client. private async reuseOAuth( pendingAuth: PendingAuthTargetClient, ): Promise<ExtendedClientI> { const extendedClient = await this.oauthConnectionHandler.safeTryWithExistingTokens( pendingAuth.targetServer, ); if (!extendedClient) { this.logger.info("Could not reuse possible OAuth tokens", { targetServerName: pendingAuth.targetServer.name, }); // TODO: Add a more specific error message here return Promise.reject(new FailedToConnectToTargetServer()); } this.logger.info("OAuth connection established", { targetServerName: pendingAuth.targetServer.name, }); const tools = await extendedClient.listTools(); if (LOG_FLAGS.LOG_DETAILED_TOOL_LISTINGS) { this.logger.debug("Available tools", { tools }); } else { this.logger.debug("Available tools names", { toolNames: tools.tools.map((tool) => tool.name), }); } // Update the clientsByService map - this will replace the pendingAuth entry const newTargetClient: TargetClient = { _state: "connected" as const, targetServer: pendingAuth.targetServer, extendedClient, }; await this.recordClientUpsert(newTargetClient); return extendedClient; } // A method to record state about new client or update existing client. // Will update both the internal map and the system state tracker. private async recordClientUpsert( newTargetClient: TargetClient, ): Promise<void> { this._clientsByService.set( newTargetClient.targetServer.name, newTargetClient, ); const systemStateTargetServer = await prepareForSystemState(newTargetClient); this.systemState.recordTargetServerConnection(systemStateTargetServer); this.postChangeHook?.(this.targetServers); } // A method to record that a client was removed. // Will remove it from the internal map and update the system state tracker. private recordClientRemoved(name: string): void { this._clientsByService.delete(name); this.systemState.recordTargetServerDisconnected({ name }); this.postChangeHook?.(this.targetServers); } // A method to find and narrow down the type of a client to PendingAuthTargetClient. // Throws if not found or if the client is not in pendingAuth state. private async getPendingAuthClient( targetServerName: string, ): Promise<PendingAuthTargetClient> { const client = this._clientsByService.get(targetServerName); if (!client) { this.logger.error("No client found for target server", { targetServerName, }); return Promise.reject(new NotFoundError("Client not found")); } if (client._state === "pending-auth") { return client; } this.logger.info("Found client is not pendingAuth", { targetServerName, type: client._state, }); return Promise.reject(new NotFoundError("Client is not pendingAuth")); } // A method to safely initiate a client connection. // Should always return a resolved Promise. private async safeInitiateClient( targetServer: TargetServer, ): Promise<TargetClient> { // Error might be a connection error, either on an initial connection // or when retrying with OAuth. // We handle it once, in the end of the method, and return undefined if that happens. let error: Error | undefined; try { const extendedClient = await this.connectionFactory.createConnection(targetServer); return { _state: "connected", extendedClient, targetServer }; } catch (initialError) { error = makeError(initialError); // Check if OAuth is required if ( isAuthenticationError(error) && (targetServer.type === "sse" || targetServer.type === "streamable-http") ) { // Server requires OAuth authentication this.logger.warn( "Server requires OAuth authentication, attempting OAuth flow", { name: targetServer.name, type: targetServer.type }, ); // Attempt OAuth - this will try static OAuth if configured, or DCR as fallback // If the provider can't be created (e.g., missing credentials), it will throw try { return await this.initiateRemoteUnauthedClient(targetServer); } catch (oauthError) { error = makeError(oauthError); // Proceed to return connection-failed below } } // For non-OAuth errors or stdio connections, return undefined this.logger.error("Failed to initiate client", { name: targetServer.name, type: targetServer.type, error: loggableError(error), }); return { _state: "connection-failed", targetServer, error }; } } private async initiateRemoteUnauthedClient( targetServer: RemoteTargetServer, ): Promise<TargetClient> { const pendingAuthClient: PendingAuthTargetClient = { _state: "pending-auth", targetServer, }; try { const extendedClient = await this.reuseOAuth(pendingAuthClient); return { _state: "connected", extendedClient, targetServer }; } catch (reuseError) { const error = loggableError(reuseError); this.logger.debug( "Failed to reuse OAuth tokens, will mark as pendingAuth", { targetServerName: targetServer.name, error }, ); } this.logger.info( `Target server requires OAuth authentication, call /auth/initiate/${targetServer.name} to start`, { name: targetServer.name, type: targetServer.type }, ); return pendingAuthClient; } } async function prepareForSystemState( targetClient: TargetClient, ): Promise<TargetServerNewWithoutUsage> { switch (targetClient._state) { case "connected": { const state = { type: "connected" as const }; const { extendedClient, targetServer } = targetClient; const { tools } = await extendedClient.listTools(); const { tools: originalTools } = await extendedClient.originalTools(); switch (targetServer.type) { case "stdio": return { _type: "stdio", state, ...targetServer, tools, originalTools, }; case "sse": return { _type: "sse", state, ...targetServer, tools, originalTools }; case "streamable-http": return { _type: "streamable-http", state, ...targetServer, tools, originalTools, }; } break; } case "pending-auth": return { _type: targetClient.targetServer.type, state: { type: "pending-auth" }, ...targetClient.targetServer, tools: [], originalTools: [], }; case "connection-failed": switch (targetClient.targetServer.type) { case "stdio": return { _type: targetClient.targetServer.type, state: { type: "connection-failed", error: prepareError(targetClient.error), }, ...targetClient.targetServer, tools: [], originalTools: [], }; case "sse": case "streamable-http": return { _type: targetClient.targetServer.type, state: { type: "connection-failed", error: prepareError(targetClient.error), }, ...targetClient.targetServer, tools: [], originalTools: [], }; } } } function prepareError(error: Error): { name: string; message: string; stack: string | undefined; } { return { name: error.name, message: error.message, stack: error.stack, }; }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/TheLunarCompany/lunar'

If you have feedback or need assistance with the MCP directory API, please join our Discord server