createMcpServer.ts•14 kB
import { JSONSchemaToZod } from "@dmitryrechkin/json-schema-to-zod";
import { Client } from "@modelcontextprotocol/sdk/client/index.js";
import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js";
import {
CallToolRequestSchema,
GetPromptRequestSchema,
ListPromptsRequestSchema,
ListResourcesRequestSchema,
ListResourceTemplatesRequestSchema,
ListToolsRequestSchema,
ReadResourceRequestSchema,
SetLevelRequestSchema,
} from "@modelcontextprotocol/sdk/types.js";
import { getServerCapabilities } from "./getServerCapabilities.js";
import { type McpServerConfig, type ServerInstance } from "./main.js";
import { setupAllNotificationHandlers } from "./notificationHandlers.js";
import { selectTransport } from "./selectTransport.js";
// 创建MCP服务器实例
export async function createMcpServer(
serverName: string,
serverConfig: McpServerConfig,
): Promise<ServerInstance | null> {
// 使用selectTransport函数选择合适的transport
let transport = selectTransport(serverConfig);
if (transport) {
console.log("transport", transport);
transport.onclose = () => {
console.log(`[${serverName}] Transport connection closed`);
};
}
if (!transport) {
throw new Error(
"Failed to create transport, please check the configuration.",
);
}
console.log("clienttransport", transport);
transport.onclose = () => {
console.log(`[${serverName}] Transport connection closed`);
};
transport.onerror = (error) => {
console.error(`[${serverName}] Transport connection error:`, error);
};
// const client= new McpClient()
const client = new Client(
{ name: `bridge-client-${serverName}`, version: "1.0.0" },
{
capabilities: {
tools: {},
resources: {},
prompts: {},
},
},
);
client.onclose = () => {
console.log(`[${serverName}] Transport connection closed`);
};
client.onerror = (error) => {
console.error(`[${serverName}] Transport connection error:`, error);
};
try {
// 添加超时处理,防止连接挂起
const connectPromise = client.connect(transport);
const timeoutPromise = new Promise<never>((_, reject) => {
setTimeout(() => reject(new Error("Connection timeout")), 10000);
});
await Promise.race([connectPromise, timeoutPromise]);
} catch (error) {
console.error(`[${serverName}] Error connecting to server:`, error);
// 确保在连接失败时正确清理transport
try {
transport?.close?.();
} catch (cleanupError) {
console.warn(
`[${serverName}] Error cleaning up transport:`,
cleanupError,
);
}
return null;
}
//@ts-ignore
console.log("client connected", transport);
const capabilities = (await getServerCapabilities(client)) ?? {};
console.log(`[${serverName}] capabilities:`, capabilities);
const listOutputs = {
tools: null as
| Awaited<ReturnType<typeof client.listTools>>
| undefined
| null,
prompts: null as
| Awaited<ReturnType<typeof client.listPrompts>>
| undefined
| null,
resources: null as
| Awaited<ReturnType<typeof client.listResources>>
| undefined
| null,
resourceTemplates: null as
| Awaited<ReturnType<typeof client.listResourceTemplates>>
| undefined
| null,
};
// 获取工具列表
try {
const tools = await client.listTools();
console.log(
`[${serverName}] Registering tools:`,
JSON.stringify(tools, null, 4),
);
listOutputs.tools = tools;
capabilities.tools = {
listChanged: true,
};
} catch (error) {
console.error(`[${serverName}] Error listing tools:`, error);
capabilities.tools = undefined;
}
if ((await getServerCapabilities(client))?.prompts) {
// 获取提示列表
try {
const prompts = await client.listPrompts();
console.log(
`[${serverName}] Registering prompts:`,
JSON.stringify(prompts, null, 4),
);
listOutputs.prompts = prompts;
capabilities.prompts = {
listChanged: true,
};
} catch (error) {
console.error(`[${serverName}] Error listing prompts:`, error);
capabilities.prompts = undefined;
}
}
if ((await getServerCapabilities(client))?.resources) {
// 获取资源列表
try {
const Resources = await client.listResources();
console.log(
`[${serverName}] Registering Resources:`,
JSON.stringify(Resources, null, 4),
);
listOutputs.resources = Resources;
capabilities.resources = {
listChanged: true,
};
} catch (error) {
console.error(`[${serverName}] Error listing Resources:`, error);
capabilities.resources = undefined;
if (listOutputs.resources || listOutputs.resourceTemplates) {
capabilities.resources = {};
}
}
}
if ((await getServerCapabilities(client))?.resources) {
try {
const ResourcesTemplates = await client.listResourceTemplates();
console.log(
`[${serverName}] Registering ResourcesTemplates:`,
JSON.stringify(ResourcesTemplates, null, 4),
);
listOutputs.resourceTemplates = ResourcesTemplates;
capabilities.resources = {
listChanged: true,
};
} catch (error: any) {
console.error(`[${serverName}] Error listing ResourcesTemplates:`, error);
if (
String(error).includes("McpError: MCP error -32001: Request timed out")
) {
throw error;
}
capabilities.resources = undefined;
if (listOutputs.resources || listOutputs.resourceTemplates) {
capabilities.resources = {};
}
}
}
const server = new McpServer(
{
name: `bridge-service-${serverName}`,
version: "1.0.0",
},
{
capabilities: Object.assign(capabilities, {
tools: { listChanged: true },
}),
},
);
// 注册工具
try {
if (capabilities.tools && listOutputs.tools) {
server.server.registerCapabilities({
tools: { listChanged: true },
});
const tools = listOutputs.tools;
await Promise.all(
tools.tools.map(async (tool) => {
console.log(
`[${serverName}] Registering tool: `,
JSON.stringify(
{
name: tool.name,
description: tool.description,
annotations: tool.annotations,
},
null,
4,
),
);
//@ts-ignore
const inputSchema = JSONSchemaToZod.convert(tool.inputSchema).shape;
const outputSchema = tool.outputSchema
? //@ts-ignore
JSONSchemaToZod.convert(tool.outputSchema).shape
: tool.outputSchema;
server.registerTool(
tool.name,
{
description: tool.description,
annotations: tool.annotations,
...tool,
inputSchema: inputSchema,
outputSchema,
},
//@ts-ignore
async (params: any) => {
console.log(
`[${serverName}] Calling tool`,
JSON.stringify({ name: tool.name, params }, null, 4),
);
const result = await client.callTool({
name: tool.name,
arguments: params,
});
return result;
},
);
}),
);
}
} catch (error) {
console.error(`[${serverName}] Error Registering tools:`, error);
}
// 注册提示
try {
if (capabilities.prompts && listOutputs.prompts) {
//@ts-ignore
server.server.setRequestHandler(ListPromptsRequestSchema, async () => {
console.log(`[${serverName}] Listing prompts...`);
return listOutputs.prompts;
});
server.server.setRequestHandler(
GetPromptRequestSchema,
async (request) => {
console.log(
`[${serverName}] Getting prompt...`,
JSON.stringify(request.params, null, 4),
);
const result = await client.getPrompt(request.params);
return result;
},
);
}
} catch (error) {
console.error(`[${serverName}] Error Registering prompts:`, error);
}
// 注册资源
try {
if (capabilities.resources && listOutputs.resources) {
server.server.setRequestHandler(
ReadResourceRequestSchema,
async (request) => {
console.log(
`[${serverName}] Reading resource...`,
JSON.stringify(request.params, null, 4),
);
const result = await client.readResource(request.params);
return result;
},
);
server.server.setRequestHandler(
ListResourcesRequestSchema,
//@ts-ignore
async (request) => {
console.log(
`[${serverName}] Listing resources...`,
JSON.stringify(request.params, null, 4),
);
return listOutputs.resources;
},
);
server.server.setRequestHandler(
ListResourceTemplatesRequestSchema,
//@ts-ignore
async (request) => {
console.log(
`[${serverName}] Listing resourceTemplates...`,
JSON.stringify(request.params, null, 4),
);
return listOutputs.resourceTemplates;
},
);
}
} catch (error) {
console.error(`[${serverName}] Error Registering Resources:`, error);
}
console.log("getServerCapabilities", client.getServerCapabilities());
if (client.getServerCapabilities()?.logging) {
server.server.setRequestHandler(SetLevelRequestSchema, async (args) => {
console.log(
`[${serverName}] Setting logging level...`,
JSON.stringify(args.params, null, 4),
);
return await client.setLoggingLevel(args.params.level);
});
}
server.server.setRequestHandler(
ListToolsRequestSchema,
async (request, extra) => {
console.log(
`[${serverName}] Listing tools...`,
JSON.stringify(request.params, null, 4),
);
const tools = await client.listTools(request.params);
return tools;
},
);
server.server.setRequestHandler(
CallToolRequestSchema,
async (request, extra) => {
console.log(
`[${serverName}] Calling tool...`,
JSON.stringify(request.params, null, 4),
);
const tools = await client.callTool(request.params);
return tools;
},
);
// 设置所有通知处理器
setupAllNotificationHandlers(client, server, serverName);
client.onerror = (error) => {
console.error(`[${serverName}] Client error:`, error);
// 防止错误导致程序崩溃
try {
// 可以在这里添加错误恢复逻辑
} catch (e) {
console.error(`[${serverName}] Error in error handler:`, e);
}
};
client.onclose = async () => {
console.log(`[${serverName}] Connection closed`);
let retryCount = 0;
const maxRetries = 3;
const retryDelay = 1000; // 1 second
const tryReconnect = async (): Promise<void> => {
try {
console.log(
`[${serverName}] Attempting to reconnect... (attempt ${
retryCount + 1
}/${maxRetries})`,
);
// 清理旧的transport
if (transport) {
try {
transport.close?.();
} catch (e) {
console.warn(`[${serverName}] Error closing old transport:`, e);
}
}
transport = selectTransport(serverConfig);
if (transport) {
console.log("transport", transport);
transport.onclose = () => {
console.log(`[${serverName}] Transport connection closed`);
};
}
if (!transport) {
console.error(
`[${serverName}] Failed to create transport, please check the configuration.`,
);
return;
}
transport.onclose = () => {
console.log(`[${serverName}] Transport connection closed`);
};
transport.onerror = (error) => {
console.error(`[${serverName}] Transport connection error:`, error);
};
// 添加超时处理的连接
const connectPromise = client.connect(transport);
const timeoutPromise = new Promise<never>((_, reject) => {
setTimeout(() => reject(new Error("Reconnection timeout")), 5000);
});
await Promise.race([connectPromise, timeoutPromise]);
console.log(`[${serverName}] Reconnected successfully`);
} catch (error) {
retryCount++;
console.error(
`[${serverName}] Reconnection attempt ${retryCount} failed:`,
error,
);
// 确保在重连失败时正确清理transport
try {
transport?.close?.();
} catch (cleanupError) {
console.warn(
`[${serverName}] Error cleaning up transport during reconnect:`,
cleanupError,
);
}
if (retryCount < maxRetries) {
console.log(`[${serverName}] Retrying in ${retryDelay}ms...`);
setTimeout(tryReconnect, retryDelay * retryCount); // Exponential backoff
} else {
console.error(
`[${serverName}] Maximum reconnection attempts (${maxRetries}) reached. Giving up.`,
);
// 可以在这里触发通知或标记服务器为不可用状态
}
}
};
// 使用 setImmediate 避免在事件循环中抛出未捕获的异常
setImmediate(() => {
try {
tryReconnect();
} catch (error) {
console.error(
`[${serverName}] Error during reconnection setup:`,
error,
);
}
});
};
return {
config: serverConfig,
server,
client,
transport: transport,
httpTransports: null as any,
};
}