/**
* Aggregates tools, resources, and prompts from multiple downstream MCP
* client connections and routes calls to the correct server.
*/
import type { ClientConnection } from "../client/connection.js";
import {
addToolPrefix,
removeToolPrefix,
addResourcePrefix,
removeResourcePrefix,
addPromptPrefix,
removePromptPrefix,
} from "./namespacing.js";
import { ProxyError } from "../utils/errors.js";
import logger from "../utils/logger.js";
export class Aggregator {
private connections: Map<string, ClientConnection>;
constructor(connections: Map<string, ClientConnection>) {
this.connections = connections;
}
updateConnections(connections: Map<string, ClientConnection>): void {
this.connections = connections;
}
/**
* Try to connect all servers in parallel. Returns silently for failures.
* Used by listing methods to ensure servers are connected on demand.
*/
private async connectAllBestEffort(): Promise<void> {
await Promise.allSettled(
[...this.connections.entries()].map(async ([serverId, conn]) => {
if (conn.isConnected) return;
try {
await conn.ensureConnected();
} catch (err) {
logger.debug(`On-demand connect failed for ${serverId}`, {
error: (err as Error).message,
});
}
})
);
}
async listTools(): Promise<unknown[]> {
await this.connectAllBestEffort();
const tools: unknown[] = [];
for (const [serverId, conn] of this.connections) {
if (conn.isConnected) {
try {
const client = conn.getClient();
const result = await client.listTools();
for (const tool of result.tools) {
tools.push({
...tool,
name: addToolPrefix(serverId, tool.name),
});
}
} catch (err) {
logger.warn(`Failed to list tools from ${serverId}`, {
error: (err as Error).message,
});
}
} else if (conn.status.capabilities) {
// Use cached capabilities for disconnected servers
for (const tool of conn.status.capabilities.tools) {
tools.push({
...tool,
name: addToolPrefix(serverId, tool.name),
});
}
} else {
logger.debug(`Skipping ${serverId} for listTools: not connected and no cached capabilities`);
}
}
return tools;
}
async listResources(): Promise<unknown[]> {
await this.connectAllBestEffort();
const resources: unknown[] = [];
for (const [serverId, conn] of this.connections) {
if (conn.isConnected) {
try {
const client = conn.getClient();
const result = await client.listResources();
for (const resource of result.resources) {
resources.push({
...resource,
uri: addResourcePrefix(serverId, resource.uri),
});
}
} catch (err) {
logger.warn(`Failed to list resources from ${serverId}`, {
error: (err as Error).message,
});
}
} else if (conn.status.capabilities) {
for (const resource of conn.status.capabilities.resources) {
resources.push({
...resource,
uri: addResourcePrefix(serverId, resource.uri),
});
}
} else {
logger.debug(`Skipping ${serverId} for listResources: not connected and no cached capabilities`);
}
}
return resources;
}
async listResourceTemplates(): Promise<unknown[]> {
await this.connectAllBestEffort();
const templates: unknown[] = [];
for (const [serverId, conn] of this.connections) {
if (conn.isConnected) {
try {
const client = conn.getClient();
const result = await client.listResourceTemplates();
for (const template of result.resourceTemplates) {
templates.push({
...template,
uriTemplate: addResourcePrefix(serverId, template.uriTemplate),
});
}
} catch (err) {
logger.warn(`Failed to list resource templates from ${serverId}`, {
error: (err as Error).message,
});
}
} else if (conn.status.capabilities) {
for (const template of conn.status.capabilities.resourceTemplates) {
templates.push({
...template,
uriTemplate: addResourcePrefix(serverId, template.uriTemplate),
});
}
} else {
logger.debug(`Skipping ${serverId} for listResourceTemplates: not connected and no cached capabilities`);
}
}
return templates;
}
async listPrompts(): Promise<unknown[]> {
await this.connectAllBestEffort();
const prompts: unknown[] = [];
for (const [serverId, conn] of this.connections) {
if (conn.isConnected) {
try {
const client = conn.getClient();
const result = await client.listPrompts();
for (const prompt of result.prompts) {
prompts.push({
...prompt,
name: addPromptPrefix(serverId, prompt.name),
});
}
} catch (err) {
logger.warn(`Failed to list prompts from ${serverId}`, {
error: (err as Error).message,
});
}
} else if (conn.status.capabilities) {
for (const prompt of conn.status.capabilities.prompts) {
prompts.push({
...prompt,
name: addPromptPrefix(serverId, prompt.name),
});
}
} else {
logger.debug(`Skipping ${serverId} for listPrompts: not connected and no cached capabilities`);
}
}
return prompts;
}
async callTool(
namespacedName: string,
args?: Record<string, unknown>,
): Promise<unknown> {
const parsed = removeToolPrefix(namespacedName);
if (!parsed) {
throw new ProxyError(`Invalid namespaced tool name: ${namespacedName}`);
}
const { serverId, toolName } = parsed;
const conn = this.connections.get(serverId);
if (!conn) {
throw new ProxyError(`Server "${serverId}" is not connected`);
}
try {
await conn.ensureConnected();
} catch (err) {
throw new ProxyError(`Server "${serverId}" is not connected: ${(err as Error).message}`);
}
logger.debug(`Routing tool call: ${toolName} -> ${serverId}`);
const client = conn.getClient();
return client.callTool({ name: toolName, arguments: args });
}
async readResource(namespacedUri: string): Promise<unknown> {
const parsed = removeResourcePrefix(namespacedUri);
if (!parsed) {
throw new ProxyError(
`Invalid namespaced resource URI: ${namespacedUri}`,
);
}
const { serverId, uri } = parsed;
const conn = this.connections.get(serverId);
if (!conn) {
throw new ProxyError(`Server "${serverId}" is not connected`);
}
try {
await conn.ensureConnected();
} catch (err) {
throw new ProxyError(`Server "${serverId}" is not connected: ${(err as Error).message}`);
}
logger.debug(`Routing resource read: ${uri} -> ${serverId}`);
const client = conn.getClient();
return client.readResource({ uri });
}
async getPrompt(
namespacedName: string,
args?: Record<string, string>,
): Promise<unknown> {
const parsed = removePromptPrefix(namespacedName);
if (!parsed) {
throw new ProxyError(
`Invalid namespaced prompt name: ${namespacedName}`,
);
}
const { serverId, promptName } = parsed;
const conn = this.connections.get(serverId);
if (!conn) {
throw new ProxyError(`Server "${serverId}" is not connected`);
}
try {
await conn.ensureConnected();
} catch (err) {
throw new ProxyError(`Server "${serverId}" is not connected: ${(err as Error).message}`);
}
logger.debug(`Routing prompt: ${promptName} -> ${serverId}`);
const client = conn.getClient();
return client.getPrompt({ name: promptName, arguments: args });
}
}