Remote-MCP Server
by ssut
import { EventEmitter, on } from 'node:events';
import { ConsoleLogger, LogLevel, type Logger } from './logger.js';
import {
type BlobResourceContents,
CallToolRequestSchema,
type CallToolResult,
CallToolResultSchema,
GetPromptRequestSchema,
GetPromptResultSchema,
type Implementation,
type InitializeRequest,
InitializeRequestSchema,
type InitializeResult,
InitializeResultSchema,
LATEST_PROTOCOL_VERSION,
ListPromptsRequestSchema,
ListPromptsResultSchema,
ListResourcesRequestSchema,
ListResourcesResultSchema,
ListToolsRequestSchema,
ListToolsResultSchema,
type Prompt,
type PromptMessage,
ReadResourceRequestSchema,
ReadResourceResultSchema,
type Resource,
type ServerCapabilities,
type ServerNotification,
SubscribeRequestSchema,
type Tool,
UnsubscribeRequestSchema,
} from '@modelcontextprotocol/sdk/types.js';
import { initTRPC } from '@trpc/server';
import { ZodError, type z } from 'zod';
import { zodToJsonSchema } from 'zod-to-json-schema';
export type ToolHandler<T> = (args: T) => Promise<CallToolResult>;
export type ResourceHandler = () => Promise<Resource[]>;
export type ResourceContentHandler = () => Promise<BlobResourceContents[]>;
export type PromptHandler = (
args: Record<string, string>,
) => Promise<PromptMessage[]>;
export interface ToolDefinition<T> {
description: string;
schema: z.ZodType<T>;
middlewares?: Middleware[];
}
export interface ResourceDefinition {
name: string;
description?: string;
mimeType?: string;
subscribe?: boolean;
listHandler: ResourceHandler;
readHandler: ResourceContentHandler;
middlewares?: Middleware[];
}
export interface PromptDefinition {
description?: string;
arguments?: {
name: string;
description?: string;
required?: boolean;
}[];
middlewares?: Middleware[];
}
export interface MCPRouterOptions {
name: string;
version: string;
capabilities?: Partial<ServerCapabilities>;
logger?: Logger;
logLevel?: LogLevel;
}
export type Middleware = (
request: unknown,
next: (modifiedRequest: unknown) => Promise<unknown>,
) => Promise<unknown>;
interface Events {
notification: (notification: ServerNotification) => void;
resourceListChanged: () => void;
resourceUpdated: (uri: string) => void;
toolListChanged: () => void;
promptListChanged: () => void;
rootsListChanged: () => void;
}
export class MCPRouter {
private logger: Logger;
private tools = new Map<
string,
{
// biome-ignore lint/suspicious/noExplicitAny: <explanation>
definition: ToolDefinition<any>;
// biome-ignore lint/suspicious/noExplicitAny: <explanation>
handler: ToolHandler<any>;
}
>();
private resources = new Map<string, ResourceDefinition>();
private prompts = new Map<
string,
{
definition: PromptDefinition;
handler: PromptHandler;
}
>();
private subscriptions = new Map<string, Set<string>>();
private implementation: Implementation;
private capabilities: ServerCapabilities;
private events = new EventEmitter();
constructor(options: MCPRouterOptions) {
this.logger =
options.logger ||
new ConsoleLogger({
level: options.logLevel || LogLevel.INFO,
prefix: options.name,
});
this.implementation = {
name: options.name,
version: options.version,
};
this.capabilities = {
tools: options.capabilities?.tools ?? {
listChanged: false,
},
resources: options.capabilities?.resources ?? {
subscribe: false,
listChanged: false,
},
prompts: options.capabilities?.prompts ?? {
listChanged: false,
},
logging: options.capabilities?.logging,
experimental: options.capabilities?.experimental,
};
this.logger.debug('MCPRouter initialized', {
name: options.name,
version: options.version,
capabilities: this.capabilities,
});
}
private emit<T extends keyof Events>(
event: T,
...args: Parameters<Events[T]>
): void {
this.events.emit(event, ...args);
}
private _emitNotification(notification: ServerNotification) {
this.emit('notification', notification);
}
private _emitResourceListChanged() {
this._emitNotification({
method: 'notifications/resources/list_changed',
params: {},
});
}
private _emitResourceUpdated(uri: string) {
this._emitNotification({
method: 'notifications/resources/updated',
params: { uri },
});
}
private _emitToolListChanged() {
this._emitNotification({
method: 'notifications/tools/list_changed',
params: {},
});
}
private _emitPromptListChanged() {
this._emitNotification({
method: 'notifications/prompts/list_changed',
params: {},
});
}
addTool<T>(
name: string,
definition: ToolDefinition<T>,
handler: ToolHandler<T>,
) {
this.logger.debug('Adding tool', { name, definition });
this.tools.set(name, { definition, handler });
this._emitToolListChanged();
return this;
}
async listTools(): Promise<Tool[]> {
this.logger.debug('Listing resources');
const tools = Array.from(this.tools.entries()).map(
([name, { definition }]) => {
const schema = {
...zodToJsonSchema(definition.schema),
$schema: undefined,
};
return {
name,
description: definition.description,
inputSchema: {
type: 'object',
properties: schema,
},
} satisfies Tool;
},
);
return tools;
}
async callTool(name: string, args: unknown): Promise<CallToolResult> {
this.logger.debug('Calling tool', { name, args });
const tool = this.tools.get(name);
if (!tool) {
throw new Error(`Tool not found: ${name}`);
}
const { definition, handler } = tool;
const validatedArgs = definition.schema.parse(args);
this.logger.debug('Tool args validated', { validatedArgs });
let result = validatedArgs;
if (definition.middlewares) {
for (const middleware of definition.middlewares) {
result = await middleware(result, async (modified) => modified);
}
}
return handler(result);
}
addResource(uri: string, definition: ResourceDefinition) {
this.resources.set(uri, definition);
this._emitResourceListChanged();
return this;
}
async listResources(): Promise<Resource[]> {
const resources: Resource[] = [];
for (const [uri, def] of this.resources) {
const items = await def.listHandler();
resources.push(
...items.map((item) => ({
...item,
uri,
name: def.name,
description: def.description,
mimeType: def.mimeType,
})),
);
}
return resources;
}
async readResource(uri: string): Promise<BlobResourceContents[]> {
const resource = this.resources.get(uri);
if (!resource) {
throw new Error(`Resource not found: ${uri}`);
}
let result = { uri };
if (resource.middlewares) {
for (const middleware of resource.middlewares) {
result = (await middleware(result, async (modified) => modified)) as {
uri: string;
};
}
}
return resource.readHandler();
}
async subscribeToResource(uri: string): Promise<void> {
if (!this.capabilities.resources?.subscribe) {
throw new Error('Resource subscription not supported');
}
const resource = this.resources.get(uri);
if (!resource) {
throw new Error(`Resource not found: ${uri}`);
}
if (!resource.subscribe) {
throw new Error(`Resource ${uri} does not support subscriptions`);
}
if (!this.subscriptions.has(uri)) {
this.subscriptions.set(uri, new Set());
}
// biome-ignore lint/style/noNonNullAssertion: <explanation>
this.subscriptions.get(uri)!.add(uri);
}
async unsubscribeFromResource(uri: string): Promise<void> {
const subscribers = this.subscriptions.get(uri);
if (subscribers) {
subscribers.delete(uri);
if (subscribers.size === 0) {
this.subscriptions.delete(uri);
}
}
}
// Prompt methods
addPrompt(
name: string,
definition: PromptDefinition,
handler: PromptHandler,
) {
this.prompts.set(name, { definition, handler });
this._emitPromptListChanged();
return this;
}
async listPrompts(): Promise<Prompt[]> {
return Array.from(this.prompts.entries()).map(([name, { definition }]) => ({
name,
description: definition.description,
arguments: definition.arguments,
}));
}
async getPrompt(
name: string,
args: Record<string, string> = {},
): Promise<PromptMessage[]> {
this.logger.debug('Getting prompt', { name, args });
const prompt = this.prompts.get(name);
if (!prompt) {
const error = `Prompt not found: ${name}`;
this.logger.error(error);
throw new Error(error);
}
const { definition, handler } = prompt;
// Validate required arguments
if (definition.arguments) {
for (const arg of definition.arguments) {
if (arg.required && !(arg.name in args)) {
const error = `Missing required argument: ${arg.name}`;
this.logger.error(error, { name, args });
throw new Error(error);
}
}
}
// Apply middlewares
let modifiedArgs = args;
if (definition.middlewares) {
for (const middleware of definition.middlewares) {
modifiedArgs = (await middleware(
modifiedArgs,
async (modified) => modified,
)) as Record<string, string>;
}
this.logger.debug('Prompt middleware applied', { modifiedArgs });
}
const messages = await handler(modifiedArgs);
this.logger.debug('Prompt executed successfully', {
name,
messageCount: messages.length,
});
return messages;
}
// Initialize handler
async initialize(request: InitializeRequest): Promise<InitializeResult> {
this.logger.debug('Initializing', { request });
return {
id: 'RemoteMCP',
protocolVersion: LATEST_PROTOCOL_VERSION,
capabilities: this.capabilities,
serverInfo: this.implementation,
};
}
// Create tRPC router
createTRPCRouter() {
const t = initTRPC.create({
errorFormatter({ type, path, input, shape, error }) {
return {
...shape,
data: {
...shape.data,
zodError:
error.code === 'BAD_REQUEST' && error.cause instanceof ZodError
? error.cause.flatten()
: null,
},
};
},
});
const router = t.router;
const publicProcedure = t.procedure;
const events = this.events;
const appRouter = router({
ping: publicProcedure.query(() => 'pong'),
initialize: publicProcedure
.input(InitializeRequestSchema)
.output(InitializeResultSchema)
.mutation(async ({ input }) => {
const { params } = { params: input };
return this.initialize(params);
}),
'tools/list': publicProcedure
.input(ListToolsRequestSchema)
.output(ListToolsResultSchema)
.query(async ({ input }) => ({
tools: await this.listTools(),
// ...(input.params?.cursor && { nextCursor: input.params?.cursor }),
})),
'tools/call': publicProcedure
.input(CallToolRequestSchema)
.output(CallToolResultSchema)
.mutation(async ({ input }) =>
this.callTool(input.params.name, input.params.arguments),
),
// Resources endpoints
'resources/list': publicProcedure
.input(ListResourcesRequestSchema)
.output(ListResourcesResultSchema)
.query(async ({ input }) => ({
resources: await this.listResources(),
})),
'resources/read': publicProcedure
.input(ReadResourceRequestSchema)
.output(ReadResourceResultSchema)
.query(async ({ input }) => ({
contents: await this.readResource(input.params.uri),
})),
'resources/subscribe': publicProcedure
.input(SubscribeRequestSchema)
.mutation(async ({ input }) => {
await this.subscribeToResource(input.params.uri);
return {};
}),
'resources/unsubscribe': publicProcedure
.input(UnsubscribeRequestSchema)
.mutation(async ({ input }) => {
await this.unsubscribeFromResource(input.params.uri);
return {};
}),
// Prompts endpoints
'prompts/list': publicProcedure
.input(ListPromptsRequestSchema)
.output(ListPromptsResultSchema)
.query(async ({ input }) => ({
prompts: await this.listPrompts(),
// ...(input?.cursor && { nextCursor: input.cursor }),
})),
'prompts/get': publicProcedure
.input(GetPromptRequestSchema)
.output(GetPromptResultSchema)
.query(async ({ input }) => ({
messages: await this.getPrompt(
input.params.name,
input.params.arguments,
),
})),
'notifications/stream': publicProcedure.subscription(async function* () {
try {
for await (const event of on(events, 'notification', {
signal: undefined,
})) {
yield* event;
}
} finally {
}
}),
});
return appRouter;
}
}
interface Events {
resourceListChanged: () => void;
resourceUpdated: (uri: string) => void;
toolListChanged: () => void;
promptListChanged: () => void;
rootsListChanged: () => void;
}