requestHandlers.ts•15.2 kB
import {
CallToolRequestSchema,
CallToolResultSchema,
CompleteRequest,
CompleteRequestSchema,
CreateMessageRequest,
CreateMessageRequestSchema,
ElicitRequest,
ElicitRequestSchema,
GetPromptRequestSchema,
ListPromptsRequest,
ListPromptsRequestSchema,
ListResourcesRequest,
ListResourcesRequestSchema,
ListResourceTemplatesRequest,
ListResourceTemplatesRequestSchema,
ListRootsRequest,
ListRootsRequestSchema,
ListToolsRequest,
ListToolsRequestSchema,
PingRequestSchema,
ReadResourceRequestSchema,
SetLevelRequestSchema,
SubscribeRequestSchema,
UnsubscribeRequestSchema,
} from '@modelcontextprotocol/sdk/types.js';
import { MCP_URI_SEPARATOR } from '@src/constants.js';
import { ClientManager } from '@src/core/client/clientManager.js';
import { byCapabilities } from '@src/core/filtering/clientFiltering.js';
import { FilteringService } from '@src/core/filtering/filteringService.js';
import { ServerManager } from '@src/core/server/serverManager.js';
import { ClientStatus, InboundConnection, OutboundConnections } from '@src/core/types/index.js';
import { setLogLevel } from '@src/logger/logger.js';
import logger from '@src/logger/logger.js';
import { withErrorHandling } from '@src/utils/core/errorHandling.js';
import { buildUri, parseUri } from '@src/utils/core/parsing.js';
import { getRequestTimeout } from '@src/utils/core/timeoutUtils.js';
import { handlePagination } from '@src/utils/ui/pagination.js';
/**
* Registers server-specific request handlers
* @param outboundConns Record of client instances
* @param serverInfo The MCP server instance
*/
function registerServerRequestHandlers(outboundConns: OutboundConnections, inboundConn: InboundConnection): void {
Array.from(outboundConns.entries()).forEach(([_, outboundConn]) => {
outboundConn.client.setRequestHandler(
PingRequestSchema,
withErrorHandling(async () => {
return ServerManager.current.executeServerOperation(inboundConn, (inboundConn: InboundConnection) =>
inboundConn.server.ping(),
);
}, 'Error pinging'),
);
outboundConn.client.setRequestHandler(
CreateMessageRequestSchema,
withErrorHandling(async (request: CreateMessageRequest) => {
return ServerManager.current.executeServerOperation(inboundConn, (inboundConn: InboundConnection) =>
inboundConn.server.createMessage(request.params, {
timeout: getRequestTimeout(outboundConn.transport),
}),
);
}, 'Error creating message'),
);
outboundConn.client.setRequestHandler(
ElicitRequestSchema,
withErrorHandling(async (request: ElicitRequest) => {
return ServerManager.current.executeServerOperation(inboundConn, (inboundConn: InboundConnection) =>
inboundConn.server.elicitInput(request.params, {
timeout: getRequestTimeout(outboundConn.transport),
}),
);
}, 'Error eliciting input'),
);
outboundConn.client.setRequestHandler(
ListRootsRequestSchema,
withErrorHandling(async (request: ListRootsRequest) => {
return ServerManager.current.executeServerOperation(inboundConn, (inboundConn: InboundConnection) =>
inboundConn.server.listRoots(request.params, {
timeout: getRequestTimeout(outboundConn.transport),
}),
);
}, 'Error listing roots'),
);
});
}
/**
* Registers all request handlers based on available capabilities
* @param clients Record of client instances
* @param server The MCP server instance
* @param capabilities The server capabilities
* @param tags Array of tags to filter clients by
*/
export function registerRequestHandlers(outboundConns: OutboundConnections, inboundConn: InboundConnection): void {
// Register logging level handler
inboundConn.server.setRequestHandler(SetLevelRequestSchema, async (request) => {
setLogLevel(request.params.level);
return {};
});
// Register ping handler
inboundConn.server.setRequestHandler(
PingRequestSchema,
withErrorHandling(async () => {
// Health check all connected upstream clients
const healthCheckPromises = Array.from(outboundConns.entries()).map(async ([clientName, outboundConn]) => {
if (outboundConn.status === ClientStatus.Connected && outboundConn.client.transport) {
try {
await outboundConn.client.ping();
logger.info(`Health check successful for client: ${clientName}`);
} catch (error) {
logger.warn(`Health check failed for client ${clientName}: ${error}`);
}
}
});
// Wait for all health checks to complete (but don't fail if some fail)
await Promise.allSettled(healthCheckPromises);
// Always return successful pong response
return {};
}, 'Error handling ping'),
);
// Register resource-related handlers
registerResourceHandlers(outboundConns, inboundConn);
// Register tool-related handlers
registerToolHandlers(outboundConns, inboundConn);
// Register prompt-related handlers
registerPromptHandlers(outboundConns, inboundConn);
// Register completion-related handlers
registerCompletionHandlers(outboundConns, inboundConn);
// Register server-specific request handlers
registerServerRequestHandlers(outboundConns, inboundConn);
}
/**
* Registers resource-related request handlers
* @param clients Record of client instances
* @param serverInfo The MCP server instance
*/
function registerResourceHandlers(outboundConns: OutboundConnections, inboundConn: InboundConnection): void {
// List Resources handler
inboundConn.server.setRequestHandler(
ListResourcesRequestSchema,
withErrorHandling(async (request: ListResourcesRequest) => {
// First filter by capabilities, then by tags
const capabilityFilteredClients = byCapabilities({ resources: {} })(outboundConns);
const filteredClients = FilteringService.getFilteredConnections(capabilityFilteredClients, inboundConn);
const result = await handlePagination(
filteredClients,
request.params || {},
(client, params, opts) => client.listResources(params as ListResourcesRequest['params'], opts),
(outboundConn, result) =>
result.resources?.map((resource) => ({
uri: buildUri(outboundConn.name, resource.uri, MCP_URI_SEPARATOR),
name: resource.name,
description: resource.description,
mimeType: resource.mimeType,
})) ?? [],
inboundConn.enablePagination ?? false,
);
return {
resources: result.items,
nextCursor: result.nextCursor,
};
}, 'Error listing resources'),
);
// List Resource Templates handler
inboundConn.server.setRequestHandler(
ListResourceTemplatesRequestSchema,
withErrorHandling(async (request: ListResourceTemplatesRequest) => {
// First filter by capabilities, then by tags
const capabilityFilteredClients = byCapabilities({ resources: {} })(outboundConns);
const filteredClients = FilteringService.getFilteredConnections(capabilityFilteredClients, inboundConn);
const result = await handlePagination(
filteredClients,
request.params || {},
(client, params, opts) => client.listResourceTemplates(params as ListResourceTemplatesRequest['params'], opts),
(outboundConn, result) =>
result.resourceTemplates?.map((template) => ({
uriTemplate: buildUri(outboundConn.name, template.uriTemplate, MCP_URI_SEPARATOR),
name: template.name,
description: template.description,
mimeType: template.mimeType,
})) ?? [],
inboundConn.enablePagination ?? false,
);
return {
resourceTemplates: result.items,
nextCursor: result.nextCursor,
};
}, 'Error listing resource templates'),
);
// Subscribe Resource handler
inboundConn.server.setRequestHandler(
SubscribeRequestSchema,
withErrorHandling(async (request) => {
const { clientName, resourceName } = parseUri(request.params.uri, MCP_URI_SEPARATOR);
return ClientManager.current.executeClientOperation(clientName, (outboundConn) =>
outboundConn.client.subscribeResource(
{ ...request.params, uri: resourceName },
{
timeout: getRequestTimeout(outboundConn.transport),
},
),
);
}, 'Error subscribing to resource'),
);
// Unsubscribe Resource handler
inboundConn.server.setRequestHandler(
UnsubscribeRequestSchema,
withErrorHandling(async (request) => {
const { clientName, resourceName } = parseUri(request.params.uri, MCP_URI_SEPARATOR);
return ClientManager.current.executeClientOperation(clientName, (outboundConn) =>
outboundConn.client.unsubscribeResource(
{ ...request.params, uri: resourceName },
{
timeout: getRequestTimeout(outboundConn.transport),
},
),
);
}, 'Error unsubscribing from resource'),
);
// Read Resource handler
inboundConn.server.setRequestHandler(
ReadResourceRequestSchema,
withErrorHandling(async (request) => {
const { clientName, resourceName } = parseUri(request.params.uri, MCP_URI_SEPARATOR);
return ClientManager.current.executeClientOperation(clientName, async (outboundConn) => {
const resource = await outboundConn.client.readResource(
{ ...request.params, uri: resourceName },
{
timeout: getRequestTimeout(outboundConn.transport),
},
);
// Transform resource content URIs to include client name prefix
const transformedResource = {
...resource,
contents: resource.contents.map((content) => ({
...content,
uri: buildUri(outboundConn.name, content.uri, MCP_URI_SEPARATOR),
})),
};
return transformedResource;
});
}, 'Error reading resource'),
);
}
/**
* Registers tool-related request handlers
* @param clients Record of client instances
* @param serverInfo The MCP server instance
*/
function registerToolHandlers(outboundConns: OutboundConnections, inboundConn: InboundConnection): void {
// List Tools handler
inboundConn.server.setRequestHandler(
ListToolsRequestSchema,
withErrorHandling(async (request: ListToolsRequest) => {
// First filter by capabilities, then by tags
const capabilityFilteredClients = byCapabilities({ tools: {} })(outboundConns);
const filteredClients = FilteringService.getFilteredConnections(capabilityFilteredClients, inboundConn);
const result = await handlePagination(
filteredClients,
request.params || {},
(client, params, opts) => client.listTools(params as ListToolsRequest['params'], opts),
(outboundConn, result) =>
result.tools?.map((tool) => ({
name: buildUri(outboundConn.name, tool.name, MCP_URI_SEPARATOR),
description: tool.description,
inputSchema: tool.inputSchema,
outputSchema: tool.outputSchema,
annotations: tool.annotations,
})) ?? [],
inboundConn.enablePagination ?? false,
);
return {
tools: result.items,
nextCursor: result.nextCursor,
};
}, 'Error listing tools'),
);
// Call Tool handler
inboundConn.server.setRequestHandler(
CallToolRequestSchema,
withErrorHandling(async (request) => {
const { clientName, resourceName: toolName } = parseUri(request.params.name, MCP_URI_SEPARATOR);
return ClientManager.current.executeClientOperation(clientName, (outboundConn) =>
outboundConn.client.callTool({ ...request.params, name: toolName }, CallToolResultSchema, {
timeout: getRequestTimeout(outboundConn.transport),
}),
);
}, 'Error calling tool'),
);
}
/**
* Registers prompt-related request handlers
* @param clients Record of client instances
* @param serverInfo The MCP server instance
*/
function registerPromptHandlers(outboundConns: OutboundConnections, inboundConn: InboundConnection): void {
// List Prompts handler
inboundConn.server.setRequestHandler(
ListPromptsRequestSchema,
withErrorHandling(async (request: ListPromptsRequest) => {
// First filter by capabilities, then by tags
const capabilityFilteredClients = byCapabilities({ prompts: {} })(outboundConns);
const filteredClients = FilteringService.getFilteredConnections(capabilityFilteredClients, inboundConn);
const result = await handlePagination(
filteredClients,
request.params || {},
(client, params, opts) => client.listPrompts(params as ListPromptsRequest['params'], opts),
(outboundConn, result) =>
result.prompts?.map((prompt) => ({
name: buildUri(outboundConn.name, prompt.name, MCP_URI_SEPARATOR),
description: prompt.description,
arguments: prompt.arguments,
})) ?? [],
inboundConn.enablePagination ?? false,
);
return {
prompts: result.items,
nextCursor: result.nextCursor,
};
}, 'Error listing prompts'),
);
// Get Prompt handler
inboundConn.server.setRequestHandler(
GetPromptRequestSchema,
withErrorHandling(async (request) => {
const { clientName, resourceName: promptName } = parseUri(request.params.name, MCP_URI_SEPARATOR);
return ClientManager.current.executeClientOperation(clientName, (outboundConn) =>
outboundConn.client.getPrompt({ ...request.params, name: promptName }),
);
}, 'Error getting prompt'),
);
}
/**
* Registers completion-related request handlers
* @param clients Record of client instances
* @param serverInfo The MCP server instance
*/
function registerCompletionHandlers(outboundConns: OutboundConnections, inboundConn: InboundConnection): void {
inboundConn.server.setRequestHandler(
CompleteRequestSchema,
withErrorHandling(async (request: CompleteRequest) => {
const { ref } = request.params;
let clientName: string;
let updatedRef: typeof ref;
if (ref.type === 'ref/prompt') {
const { clientName: cn, resourceName } = parseUri(ref.name, MCP_URI_SEPARATOR);
clientName = cn;
updatedRef = { ...ref, name: resourceName };
} else if (ref.type === 'ref/resource') {
const { clientName: cn, resourceName } = parseUri(ref.uri, MCP_URI_SEPARATOR);
clientName = cn;
updatedRef = { ...ref, uri: resourceName };
} else {
// This should be caught by the schema validation, but as a safeguard:
throw new Error(`Unsupported completion reference type: ${(ref as any).type}`);
}
const params = { ...request.params, ref: updatedRef };
return ClientManager.current.executeClientOperation(
clientName,
(outboundConn) =>
outboundConn.client.complete(params, {
timeout: getRequestTimeout(outboundConn.transport),
}),
{},
'completions',
);
}, 'Error handling completion'),
);
}