import {
appConfigSchema,
ApplyParsedAppConfigRequest,
SerializedAppConfig,
SystemState,
TargetServerRequest,
} from "@mcpx/shared-model";
import { stringifyEq } from "@mcpx/toolkit-core/data";
import { loggableError, LunarLogger } from "@mcpx/toolkit-core/logging";
import { stringify } from "yaml";
import { z } from "zod/v4";
import { ConfigService, ConfigSnapshot } from "../config.js";
import {
AlreadyExistsError,
FailedToConnectToTargetServer,
NotFoundError,
} from "../errors.js";
import { TargetServer, targetServerSchema } from "../model/target-servers.js";
import { ControlPlaneConfigService } from "./control-plane-config-service.js";
import { SystemStateTracker } from "./system-state.js";
import { TargetClients } from "./target-clients.js";
export function sanitizeTargetServerForTelemetry(
server: TargetServerRequest | TargetServer,
): Record<string, unknown> {
switch (server.type) {
case "stdio":
return {
name: server.name,
type: server.type,
command: server.command,
args: server.args,
};
case "sse":
case "streamable-http":
return server;
}
}
export class ControlPlaneService {
private systemState: SystemStateTracker;
private targetClients: TargetClients;
private configService: ConfigService; // Dependency in deprecation - use this.config
private logger: LunarLogger;
public config: ControlPlaneConfigService;
constructor(
metricRecorder: SystemStateTracker,
targetClients: TargetClients,
configService: ConfigService,
logger: LunarLogger,
) {
this.systemState = metricRecorder;
this.targetClients = targetClients;
this.configService = configService;
this.config = new ControlPlaneConfigService(configService, logger);
this.logger = logger.child({ component: "ControlPlaneService" });
}
subscribeToAppConfigUpdates(
callback: (state: ConfigSnapshot) => void,
): () => void {
this.logger.debug("Subscribing to app config updates");
return this.config.subscribe(callback);
}
subscribeToSystemStateUpdates(
callback: (state: SystemState) => void,
): () => void {
this.logger.debug("Subscribing to system state updates");
return this.systemState.subscribe(callback);
}
getSystemState(): SystemState {
this.logger.debug("Received GetSystemState event from Control Plane");
return this.systemState.export();
}
// In deprecation: read dedicated config resources instead (`.config...`)
getAppConfig(): SerializedAppConfig {
this.logger.debug("Received GetAppConfig event from Control Plane");
return {
version: this.config.getVersion(),
lastModified: this.config.getLastModified(),
yaml: stringify(this.config.getConfig()),
};
}
// In deprecation: create/update/delete dedicated config resources instead (`.config...`)
async patchAppConfig(
payload: ApplyParsedAppConfigRequest,
): Promise<SerializedAppConfig> {
const parsedConfig = appConfigSchema.safeParse(payload);
if (!parsedConfig.success) {
this.logger.error("Invalid config schema in PatchAppConfig request", {
payload,
error: parsedConfig.error,
});
throw parsedConfig.error;
}
return this.configService.withLock(async () => {
// Get current config before update to compare
const currentConfig = this.configService.getConfig();
const updated = await this.configService.updateConfig(parsedConfig.data);
const updatedAppConfig: SerializedAppConfig = {
yaml: stringify(this.configService.getConfig()),
version: this.configService.getVersion(),
lastModified: this.configService.getLastModified(),
};
if (!updated) {
this.logger.info("No changes in app config, skipping update", {
updatedAppConfig,
});
return updatedAppConfig;
}
// Only reload clients if server-related config changed
// toolExtensions is the only config field that affects server connections
const newConfig = this.configService.getConfig();
const serverConfigChanged = !stringifyEq(
currentConfig.toolExtensions,
newConfig.toolExtensions,
);
if (serverConfigChanged) {
this.logger.info(
"Server-related config (toolExtensions) changed, reloading target clients",
);
await this.targetClients.reloadClients();
} else {
this.logger.info(
"Only non-server config changed (permissions/toolGroups/auth/targetServerAttributes), skipping client reload",
);
}
return updatedAppConfig;
});
}
async addTargetServer(
payload: TargetServerRequest,
): Promise<TargetServer | undefined> {
// Do not include env vars in logs when adding server (any type)
const data: Record<string, unknown> = {
...(payload as unknown as Record<string, unknown>),
};
if ("env" in data) {
delete (data as { env?: unknown }).env;
}
this.logger.info("Received AddTargetServer event from Control Plane", {
data,
});
try {
await this.targetClients.addClient(payload);
this.logger.info(`Target server ${payload.name} created successfully`);
this.logger.telemetry.info("target server added", {
mcpServers: {
[payload.name]: sanitizeTargetServerForTelemetry(payload),
},
});
return this.targetClients.getTargetServer(payload.name);
} catch (e: unknown) {
const error = loggableError(e);
this.logger.error(`Failed to create target server ${payload.name}`, {
error,
data: data,
});
if (
e instanceof NotFoundError ||
e instanceof AlreadyExistsError ||
e instanceof FailedToConnectToTargetServer
) {
throw e;
}
throw new Error(`Failed to create target server: ${error.errorMessage}`);
}
}
// TODO: make sure failed update does not leave the system in an inconsistent state
async updateTargetServer(
name: string,
payload: z.infer<typeof targetServerSchema>,
): Promise<TargetServer | undefined> {
this.logger.info("Received UpdateTargetServer event from Control Plane");
const existingTargetServer = this.targetClients.getTargetServer(name);
// Prepare sanitized copies for logging (remove env from any type)
const cleanPayload: Record<string, unknown> = {
...(payload as unknown as Record<string, unknown>),
};
if ("env" in cleanPayload) {
delete (cleanPayload as { env?: unknown }).env;
}
const cleanExisting: Record<string, unknown> | undefined =
existingTargetServer
? ({
...(existingTargetServer as unknown as Record<string, unknown>),
} as Record<string, unknown>)
: undefined;
if (cleanExisting && "env" in cleanExisting) {
delete (cleanExisting as { env?: unknown }).env;
}
if (!existingTargetServer) {
this.logger.error(`Target server ${name} not found for update`, {
data: cleanPayload,
});
throw new NotFoundError();
}
this.logger.info(`Updating target server ${name}`, {
existingTargetServer: cleanExisting,
payload: cleanPayload,
});
try {
// TODO: replace with safe-swap technique:
// Add new client with temp name, if successful, remove old client and rename new one
// as non-failable operation
await this.targetClients.removeClient(name);
await this.targetClients.addClient({ ...payload, name });
this.logger.info(`Target server ${name} updated successfully`);
this.logger.telemetry.info("target server updated", {
mcpServers: {
[name]: sanitizeTargetServerForTelemetry({ ...payload, name }),
},
});
return this.targetClients.getTargetServer(name);
} catch (e: unknown) {
this.logger.error(`Failed to update target server ${name}`, {
error: e,
data: cleanPayload,
});
throw e;
}
}
async removeTargetServer(name: string): Promise<void> {
this.logger.info(
"Received RemoveTargetServer event from Control Plane",
name,
);
await this.targetClients.removeClient(name);
await this.config.removeTargetServerAttribute(name).catch((e) => {
this.logger.warn(
`Failed to remove target server ${name} from config's attributes during removal`,
{ error: loggableError(e) },
);
});
this.logger.info(`Target server ${name} removed successfully`);
this.logger.telemetry.info("target server removed", {
mcpServers: { [name]: null },
});
}
}