import fs from "fs";
import path from "path";
import { fileURLToPath } from "url";
import { Server } from "@modelcontextprotocol/sdk/server/index.js";
import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js";
import {
CallToolRequestSchema,
ErrorCode,
ListToolsRequestSchema,
McpError,
} from "@modelcontextprotocol/sdk/types.js";
import { z } from "zod";
const __filename = fileURLToPath(import.meta.url);
const __dirname = path.dirname(__filename);
/**
* 解析命令行参数
*/
function parseCommandLineArgs() {
const args = process.argv.slice(2);
const params = {};
for (let i = 0; i < args.length; i++) {
const arg = args[i];
if (arg.startsWith("--")) {
// 处理 --key=value 格式
if (arg.includes("=")) {
const [key, ...valueParts] = arg.substring(2).split("=");
params[key] = valueParts.join("="); // 重新组合value,以防value中包含=
} else {
// 处理 --key value 格式
const key = arg.substring(2);
const value = args[i + 1];
if (value && !value.startsWith("--")) {
params[key] = value;
i++; // 跳过下一个参数,因为它是当前参数的值
} else {
params[key] = true; // 布尔标志
}
}
}
}
return params;
}
/**
* 验证URL格式
*/
function isValidUrl(string) {
try {
const url = new URL(string);
return url.protocol === "http:" || url.protocol === "https:";
} catch (_) {
return false;
}
}
/**
* 从远程URL获取Swagger文档
*/
async function fetchSwaggerFromUrl(url, timeout = 10000) {
const controller = new AbortController();
const timeoutId = setTimeout(() => controller.abort(), timeout);
try {
console.error(`🌐 正在从远程URL获取Swagger文档: ${url}`);
const response = await fetch(url, {
method: "GET",
headers: {
Accept: "application/json",
"User-Agent": "Swagger-MCP-Server/1.0.0",
},
signal: controller.signal,
});
clearTimeout(timeoutId);
if (!response.ok) {
throw new Error(`HTTP错误: ${response.status} ${response.statusText}`);
}
const contentType = response.headers.get("content-type");
if (!contentType || !contentType.includes("application/json")) {
console.warn(`⚠️ 警告: 响应内容类型不是JSON: ${contentType}`);
}
const swaggerDoc = await response.json();
console.error(`✅ 成功从远程URL获取Swagger文档`);
return swaggerDoc;
} catch (error) {
clearTimeout(timeoutId);
if (error.name === "AbortError") {
throw new Error(`远程URL请求超时 (${timeout}ms)`);
}
throw new Error(`无法从远程URL获取Swagger文档: ${error.message}`);
}
}
// 解析命令行参数
const cmdArgs = parseCommandLineArgs();
// 读取配置文件
let config;
try {
const configPath = path.join(__dirname, "..", "config.json");
config = JSON.parse(fs.readFileSync(configPath, "utf-8"));
} catch (error) {
console.error("无法读取config.json文件,使用默认配置:", error.message);
config = {
server: { name: "swagger-mcp-server", version: "1.0.0" },
swagger: { filePath: "./swagger.json", validateSchema: true },
http: {
timeout: 30000,
userAgent: "Swagger-MCP-Server/1.0.0",
maxRetries: 3,
retryDelay: 1000,
},
logging: { level: "info", enableConsole: true },
};
}
/**
* 验证Swagger文档格式
*/
function validateSwaggerDoc(swaggerDoc) {
if (!config.swagger.validateSchema) {
return;
}
if (!swaggerDoc.swagger || !swaggerDoc.swagger.startsWith("2.")) {
throw new Error("不是有效的Swagger 2.0文档");
}
if (!swaggerDoc.info || !swaggerDoc.info.title) {
throw new Error("缺少必需的info.title字段");
}
if (!swaggerDoc.paths || typeof swaggerDoc.paths !== "object") {
throw new Error("缺少必需的paths字段");
}
}
/**
* 加载Swagger文档
*/
async function loadSwaggerDoc() {
let swaggerDoc;
// 检查是否通过命令行参数指定了URL
const swaggerUrl = cmdArgs.url || cmdArgs.swagger;
if (swaggerUrl) {
// 验证URL格式
if (!isValidUrl(swaggerUrl)) {
throw new Error(`无效的URL格式: ${swaggerUrl}`);
}
// 检查是否允许远程URL
if (!config.swagger.allowRemoteUrl) {
throw new Error(
"配置不允许使用远程URL,请在config.json中设置allowRemoteUrl为true"
);
}
// 从远程URL获取Swagger文档
swaggerDoc = await fetchSwaggerFromUrl(
swaggerUrl,
config.swagger.urlTimeout
);
// 可选:缓存远程文档到本地
if (config.swagger.cacheRemoteSpec) {
try {
const cacheFileName = `swagger-cache-${Date.now()}.json`;
const cachePath = path.join(__dirname, "..", cacheFileName);
fs.writeFileSync(cachePath, JSON.stringify(swaggerDoc, null, 2));
console.error(`💾 已缓存远程Swagger文档到: ${cacheFileName}`);
} catch (cacheError) {
console.warn(`⚠️ 无法缓存远程文档: ${cacheError.message}`);
}
}
} else {
// 从本地文件读取
const swaggerPath = path.resolve(__dirname, "..", config.swagger.filePath);
if (!fs.existsSync(swaggerPath)) {
throw new Error(`本地Swagger文件不存在: ${swaggerPath}`);
}
swaggerDoc = JSON.parse(fs.readFileSync(swaggerPath, "utf-8"));
console.error(`📁 从本地文件加载Swagger文档: ${swaggerPath}`);
}
// 验证文档格式
validateSwaggerDoc(swaggerDoc);
console.error(
`✅ 成功加载Swagger文档: ${swaggerDoc.info.title} v${
swaggerDoc.info.version || "未知版本"
}`
);
return swaggerDoc;
}
/**
* 将Swagger参数转换为JSON Schema类型
*/
function swaggerTypeToJsonSchema(swaggerParam) {
const typeMapping = {
string: "string",
integer: "number",
number: "number",
boolean: "boolean",
array: "array",
object: "object",
file: "string", // 文件上传参数作为字符串处理
};
const schema = {
type: typeMapping[swaggerParam.type] || "string",
};
// 添加描述
if (swaggerParam.description) {
schema.description = swaggerParam.description;
}
// 处理枚举值
if (swaggerParam.enum) {
schema.enum = swaggerParam.enum;
}
// 处理数组类型
if (swaggerParam.type === "array" && swaggerParam.items) {
schema.items = swaggerTypeToJsonSchema(swaggerParam.items);
}
// 处理数字范围
if (swaggerParam.minimum !== undefined) {
schema.minimum = swaggerParam.minimum;
}
if (swaggerParam.maximum !== undefined) {
schema.maximum = swaggerParam.maximum;
}
// 处理字符串长度
if (swaggerParam.minLength !== undefined) {
schema.minLength = swaggerParam.minLength;
}
if (swaggerParam.maxLength !== undefined) {
schema.maxLength = swaggerParam.maxLength;
}
return schema;
}
/**
* 创建统一的API调用工具
*/
function createUnifiedApiTools() {
const swaggerDoc = global.swaggerDoc;
return [
{
name: "api_call",
description: `调用${swaggerDoc.info.title} API的通用工具。支持所有HTTP方法和路径。`,
inputSchema: {
type: "object",
properties: {
method: {
type: "string",
enum: ["GET", "POST", "PUT", "DELETE", "PATCH", "HEAD", "OPTIONS"],
description: "HTTP方法",
},
path: {
type: "string",
description: "API路径,例如: /users/{id} 或 /posts",
},
pathParams: {
type: "object",
description: '路径参数,例如: {"id": "123"}',
additionalProperties: true,
},
queryParams: {
type: "object",
description: '查询参数,例如: {"limit": 10, "offset": 0}',
additionalProperties: true,
},
headers: {
type: "object",
description: '请求头,例如: {"Authorization": "Bearer token"}',
additionalProperties: true,
},
body: {
type: "object",
description: "请求体数据(用于POST/PUT等方法)",
additionalProperties: true,
},
},
required: ["method", "path"],
},
},
{
name: "api_list_endpoints",
description: `列出${swaggerDoc.info.title} API的所有可用端点`,
inputSchema: {
type: "object",
properties: {
filter: {
type: "string",
description: "过滤端点的关键词(可选)",
},
method: {
type: "string",
enum: ["GET", "POST", "PUT", "DELETE", "PATCH", "HEAD", "OPTIONS"],
description: "按HTTP方法过滤(可选)",
},
},
},
},
{
name: "api_get_endpoint_info",
description: `获取${swaggerDoc.info.title} API特定端点的详细信息`,
inputSchema: {
type: "object",
properties: {
method: {
type: "string",
enum: ["GET", "POST", "PUT", "DELETE", "PATCH", "HEAD", "OPTIONS"],
description: "HTTP方法",
},
path: {
type: "string",
description: "API路径",
},
},
required: ["method", "path"],
},
},
];
}
/**
* 构建统一API调用的请求
*/
function buildUnifiedApiRequest(
method,
path,
pathParams = {},
queryParams = {},
headers = {},
body = null
) {
const baseUrl = global.baseUrl;
// 处理路径参数
let url = baseUrl + path;
for (const [key, value] of Object.entries(pathParams)) {
url = url.replace(`{${key}}`, encodeURIComponent(value));
}
// 处理查询参数
const query = new URLSearchParams();
for (const [key, value] of Object.entries(queryParams)) {
if (value !== undefined && value !== null) {
query.append(key, value);
}
}
if (query.toString()) {
url += "?" + query.toString();
}
// 处理请求头
const requestHeaders = {
"User-Agent": config.http.userAgent,
...headers,
};
// 处理请求体
let requestBody = null;
if (body && (method === "POST" || method === "PUT" || method === "PATCH")) {
requestBody = typeof body === "string" ? body : JSON.stringify(body);
if (!requestHeaders["Content-Type"]) {
requestHeaders["Content-Type"] = "application/json";
}
}
return { url, headers: requestHeaders, body: requestBody };
}
/**
* 列出所有API端点
*/
function listApiEndpoints(filter = "", methodFilter = "") {
const swaggerDoc = global.swaggerDoc;
const endpoints = [];
for (const [path, methods] of Object.entries(swaggerDoc.paths)) {
for (const [method, operation] of Object.entries(methods)) {
if (typeof operation !== "object" || !operation) continue;
const endpoint = {
method: method.toUpperCase(),
path: path,
summary: operation.summary || "",
description: operation.description || "",
operationId: operation.operationId || "",
tags: operation.tags || [],
};
// 应用过滤器
if (
filter &&
!JSON.stringify(endpoint).toLowerCase().includes(filter.toLowerCase())
) {
continue;
}
if (methodFilter && endpoint.method !== methodFilter.toUpperCase()) {
continue;
}
endpoints.push(endpoint);
}
}
return endpoints;
}
/**
* 获取特定端点的详细信息
*/
function getEndpointInfo(method, path) {
const swaggerDoc = global.swaggerDoc;
if (
!swaggerDoc.paths[path] ||
!swaggerDoc.paths[path][method.toLowerCase()]
) {
throw new Error(`端点 ${method.toUpperCase()} ${path} 不存在`);
}
const operation = swaggerDoc.paths[path][method.toLowerCase()];
const params = operation.parameters || [];
return {
method: method.toUpperCase(),
path: path,
summary: operation.summary || "",
description: operation.description || "",
operationId: operation.operationId || "",
tags: operation.tags || [],
parameters: params.map((param) => ({
name: param.name,
in: param.in,
type: param.type,
required: param.required || false,
description: param.description || "",
})),
responses: operation.responses || {},
};
}
/**
* 延迟函数
*/
function delay(ms) {
return new Promise((resolve) => setTimeout(resolve, ms));
}
/**
* 验证必需参数
*/
function validateRequiredParameters(operation, args) {
const params = operation.parameters || [];
const missingParams = [];
for (const param of params) {
if (
param.required &&
(args[param.name] === undefined || args[param.name] === null)
) {
missingParams.push(param.name);
}
}
if (missingParams.length > 0) {
throw new McpError(
ErrorCode.InvalidParams,
`缺少必需参数: ${missingParams.join(", ")}`
);
}
}
/**
* 执行统一API调用
*/
async function executeUnifiedApiCall(args) {
const {
method,
path,
pathParams = {},
queryParams = {},
headers = {},
body,
} = args;
if (!method || !path) {
throw new McpError(ErrorCode.InvalidParams, "method和path参数是必需的");
}
let lastError;
for (let attempt = 1; attempt <= config.http.maxRetries; attempt++) {
try {
const {
url,
headers: requestHeaders,
body: requestBody,
} = buildUnifiedApiRequest(
method,
path,
pathParams,
queryParams,
headers,
body
);
if (config.logging.enableConsole) {
console.error(`🚀 发起API调用: ${method.toUpperCase()} ${url}`);
}
const controller = new AbortController();
const timeoutId = setTimeout(
() => controller.abort(),
config.http.timeout
);
try {
const response = await fetch(url, {
method: method.toUpperCase(),
headers: requestHeaders,
body: requestBody,
signal: controller.signal,
});
clearTimeout(timeoutId);
const contentType = response.headers.get("content-type");
let responseData;
if (contentType && contentType.includes("application/json")) {
responseData = await response.json();
} else {
responseData = await response.text();
}
if (config.logging.enableConsole) {
console.error(
`✅ API调用成功: ${response.status} ${response.statusText}`
);
}
return {
content: [
{
type: "text",
text: `状态码: ${response.status} ${
response.statusText
}\n内容类型: ${
contentType || "unknown"
}\n响应数据: ${JSON.stringify(responseData, null, 2)}`,
},
],
};
} catch (fetchError) {
clearTimeout(timeoutId);
if (fetchError.name === "AbortError") {
throw new Error(`请求超时 (${config.http.timeout}ms)`);
}
throw fetchError;
}
} catch (error) {
lastError = error;
if (config.logging.enableConsole) {
console.error(
`🔄 API调用失败 (尝试 ${attempt}/${config.http.maxRetries}):`,
error.message
);
}
if (attempt === config.http.maxRetries || error instanceof McpError) {
break;
}
await delay(config.http.retryDelay * attempt);
}
}
if (lastError instanceof McpError) {
throw lastError;
}
throw new McpError(
ErrorCode.InternalError,
`API调用失败: ${lastError.message}`
);
}
/**
* 列出端点
*/
async function listEndpoints(args) {
const { filter = "", method = "" } = args || {};
try {
const endpoints = listApiEndpoints(filter, method);
return {
content: [
{
type: "text",
text: `找到 ${endpoints.length} 个API端点:\n\n${endpoints
.map(
(ep) =>
`${ep.method} ${ep.path}\n ${
ep.summary || ep.description || "无描述"
}\n 标签: ${ep.tags.join(", ") || "无"}`
)
.join("\n\n")}`,
},
],
};
} catch (error) {
throw new McpError(
ErrorCode.InternalError,
`获取端点列表失败: ${error.message}`
);
}
}
/**
* 获取端点详情
*/
async function getEndpointDetails(args) {
const { method, path } = args || {};
if (!method || !path) {
throw new McpError(ErrorCode.InvalidParams, "method和path参数是必需的");
}
try {
const info = getEndpointInfo(method, path);
const paramInfo = info.parameters
.map(
(p) =>
` - ${p.name} (${p.in}): ${p.type} ${
p.required ? "[必需]" : "[可选]"
}\n ${p.description || "无描述"}`
)
.join("\n");
return {
content: [
{
type: "text",
text:
`端点详情: ${info.method} ${info.path}\n\n` +
`摘要: ${info.summary}\n` +
`描述: ${info.description}\n` +
`操作ID: ${info.operationId}\n` +
`标签: ${info.tags.join(", ")}\n\n` +
`参数:\n${paramInfo || " 无参数"}\n\n` +
`响应: ${JSON.stringify(info.responses, null, 2)}`,
},
],
};
} catch (error) {
throw new McpError(
ErrorCode.InternalError,
`获取端点详情失败: ${error.message}`
);
}
}
/**
* 执行单次API调用
*/
async function performApiCall(toolName, args) {
try {
const swaggerDoc = global.swaggerDoc;
// 解析工具名称,提取HTTP方法和路径
const parts = toolName.split("_");
const method = parts[0].toLowerCase();
// 找到对应的路径
let targetPath = null;
let targetOperation = null;
for (const [path, methods] of Object.entries(swaggerDoc.paths)) {
if (methods[method]) {
const pathToolName = `${method.toUpperCase()}_${path
.replace(/[{}]/g, "")
.replace(/\//g, "_")}`;
if (pathToolName === toolName) {
targetPath = path;
targetOperation = methods[method];
break;
}
}
}
if (!targetPath || !targetOperation) {
throw new McpError(ErrorCode.InvalidRequest, `未找到工具: ${toolName}`);
}
// 验证必需参数
validateRequiredParameters(targetOperation, args);
// 构建请求
const { url, headers, body } = buildRequestUrl(targetPath, method, args);
// 设置默认请求头
const requestHeaders = {
"User-Agent": config.http.userAgent,
...headers,
};
if (config.logging.enableConsole) {
console.error(`🚀 发起API调用: ${method.toUpperCase()} ${url}`);
}
// 创建AbortController用于超时控制
const controller = new AbortController();
const timeoutId = setTimeout(() => controller.abort(), config.http.timeout);
try {
// 执行请求
const response = await fetch(url, {
method: method.toUpperCase(),
headers: requestHeaders,
body: body,
signal: controller.signal,
});
clearTimeout(timeoutId);
// 处理响应
const contentType = response.headers.get("content-type");
let responseData;
if (contentType && contentType.includes("application/json")) {
responseData = await response.json();
} else {
responseData = await response.text();
}
// 记录成功的响应
if (config.logging.enableConsole) {
console.error(
`✅ API调用成功: ${response.status} ${response.statusText}`
);
}
// 返回结果
return {
content: [
{
type: "text",
text: `状态码: ${response.status} ${
response.statusText
}\n内容类型: ${
contentType || "unknown"
}\n响应数据: ${JSON.stringify(responseData, null, 2)}`,
},
],
};
} catch (fetchError) {
clearTimeout(timeoutId);
if (fetchError.name === "AbortError") {
throw new Error(`请求超时 (${config.http.timeout}ms)`);
}
throw fetchError;
}
} catch (error) {
if (error instanceof McpError) {
throw error;
}
throw new McpError(
ErrorCode.InternalError,
`API调用失败: ${error.message}`
);
}
}
// 创建MCP服务器
const server = new Server(
{
name: config.server.name,
version: config.server.version,
},
{
capabilities: {
tools: {},
},
}
);
// 注册工具列表处理器
server.setRequestHandler(ListToolsRequestSchema, async () => {
const tools = createUnifiedApiTools();
return {
tools: tools,
};
});
// 注册工具调用处理器
server.setRequestHandler(CallToolRequestSchema, async (request) => {
const { name, arguments: args } = request.params;
try {
switch (name) {
case "api_call":
return await executeUnifiedApiCall(args);
case "api_list_endpoints":
return await listEndpoints(args);
case "api_get_endpoint_info":
return await getEndpointDetails(args);
default:
throw new McpError(ErrorCode.InvalidRequest, `未知工具: ${name}`);
}
} catch (error) {
if (error instanceof McpError) {
throw error;
}
throw new McpError(ErrorCode.InternalError, error.message);
}
});
// 启动服务器
async function main() {
// 加载swagger.json文件
let swaggerDoc;
try {
swaggerDoc = await loadSwaggerDoc();
} catch (error) {
console.error("❌ 无法读取或验证Swagger文档:", error.message);
console.error("\n💡 使用提示:");
console.error(" 本地文件: node src/index.js");
console.error(
" 远程URL: node src/index.js --url https://example.com/swagger.json"
);
console.error(
" 或者: node src/index.js --swagger https://example.com/api-docs"
);
process.exit(1);
}
// 构建基础URL
const baseUrl = `${swaggerDoc.schemes?.[0] || "https"}://${swaggerDoc.host}${
swaggerDoc.basePath || ""
}`;
// 将swaggerDoc设置为全局变量,供其他函数使用
global.swaggerDoc = swaggerDoc;
global.baseUrl = baseUrl;
const transport = new StdioServerTransport();
await server.connect(transport);
const tools = createUnifiedApiTools();
const totalEndpoints = Object.keys(swaggerDoc.paths).reduce((count, path) => {
return count + Object.keys(swaggerDoc.paths[path]).length;
}, 0);
if (config.logging.enableConsole) {
console.error("🎉 Swagger MCP服务器已启动");
console.error(`📡 基础URL: ${baseUrl}`);
console.error(`🔧 MCP工具数量: ${tools.length} (统一API调用工具)`);
console.error(`📊 API端点数量: ${totalEndpoints} 个`);
console.error(
`⚙️ 配置: 超时${config.http.timeout}ms, 重试${config.http.maxRetries}次`
);
// 显示数据源信息
const swaggerUrl = cmdArgs.url || cmdArgs.swagger;
if (swaggerUrl) {
console.error(`🌐 数据源: 远程URL - ${swaggerUrl}`);
} else {
console.error(`📁 数据源: 本地文件 - ${config.swagger.filePath}`);
}
// 列出所有可用工具
console.error("\n📋 可用MCP工具:");
tools.forEach((tool, index) => {
console.error(` ${index + 1}. ${tool.name} - ${tool.description}`);
});
console.error(
`\n💡 提示: 使用 api_list_endpoints 工具查看所有 ${totalEndpoints} 个API端点`
);
}
}
main().catch((error) => {
console.error("服务器启动失败:", error);
process.exit(1);
});