Skip to main content
Glama
index.js24 kB
import fs from "fs"; import path from "path"; import { fileURLToPath } from "url"; import { Server } from "@modelcontextprotocol/sdk/server/index.js"; import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js"; import { CallToolRequestSchema, ErrorCode, ListToolsRequestSchema, McpError, } from "@modelcontextprotocol/sdk/types.js"; import { z } from "zod"; const __filename = fileURLToPath(import.meta.url); const __dirname = path.dirname(__filename); /** * 解析命令行参数 */ function parseCommandLineArgs() { const args = process.argv.slice(2); const params = {}; for (let i = 0; i < args.length; i++) { const arg = args[i]; if (arg.startsWith("--")) { // 处理 --key=value 格式 if (arg.includes("=")) { const [key, ...valueParts] = arg.substring(2).split("="); params[key] = valueParts.join("="); // 重新组合value,以防value中包含= } else { // 处理 --key value 格式 const key = arg.substring(2); const value = args[i + 1]; if (value && !value.startsWith("--")) { params[key] = value; i++; // 跳过下一个参数,因为它是当前参数的值 } else { params[key] = true; // 布尔标志 } } } } return params; } /** * 验证URL格式 */ function isValidUrl(string) { try { const url = new URL(string); return url.protocol === "http:" || url.protocol === "https:"; } catch (_) { return false; } } /** * 从远程URL获取Swagger文档 */ async function fetchSwaggerFromUrl(url, timeout = 10000) { const controller = new AbortController(); const timeoutId = setTimeout(() => controller.abort(), timeout); try { console.error(`🌐 正在从远程URL获取Swagger文档: ${url}`); const response = await fetch(url, { method: "GET", headers: { Accept: "application/json", "User-Agent": "Swagger-MCP-Server/1.0.0", }, signal: controller.signal, }); clearTimeout(timeoutId); if (!response.ok) { throw new Error(`HTTP错误: ${response.status} ${response.statusText}`); } const contentType = response.headers.get("content-type"); if (!contentType || !contentType.includes("application/json")) { console.warn(`⚠️ 警告: 响应内容类型不是JSON: ${contentType}`); } const swaggerDoc = await response.json(); console.error(`✅ 成功从远程URL获取Swagger文档`); return swaggerDoc; } catch (error) { clearTimeout(timeoutId); if (error.name === "AbortError") { throw new Error(`远程URL请求超时 (${timeout}ms)`); } throw new Error(`无法从远程URL获取Swagger文档: ${error.message}`); } } // 解析命令行参数 const cmdArgs = parseCommandLineArgs(); // 读取配置文件 let config; try { const configPath = path.join(__dirname, "..", "config.json"); config = JSON.parse(fs.readFileSync(configPath, "utf-8")); } catch (error) { console.error("无法读取config.json文件,使用默认配置:", error.message); config = { server: { name: "swagger-mcp-server", version: "1.0.0" }, swagger: { filePath: "./swagger.json", validateSchema: true }, http: { timeout: 30000, userAgent: "Swagger-MCP-Server/1.0.0", maxRetries: 3, retryDelay: 1000, }, logging: { level: "info", enableConsole: true }, }; } /** * 验证Swagger文档格式 */ function validateSwaggerDoc(swaggerDoc) { if (!config.swagger.validateSchema) { return; } if (!swaggerDoc.swagger || !swaggerDoc.swagger.startsWith("2.")) { throw new Error("不是有效的Swagger 2.0文档"); } if (!swaggerDoc.info || !swaggerDoc.info.title) { throw new Error("缺少必需的info.title字段"); } if (!swaggerDoc.paths || typeof swaggerDoc.paths !== "object") { throw new Error("缺少必需的paths字段"); } } /** * 加载Swagger文档 */ async function loadSwaggerDoc() { let swaggerDoc; // 检查是否通过命令行参数指定了URL const swaggerUrl = cmdArgs.url || cmdArgs.swagger; if (swaggerUrl) { // 验证URL格式 if (!isValidUrl(swaggerUrl)) { throw new Error(`无效的URL格式: ${swaggerUrl}`); } // 检查是否允许远程URL if (!config.swagger.allowRemoteUrl) { throw new Error( "配置不允许使用远程URL,请在config.json中设置allowRemoteUrl为true" ); } // 从远程URL获取Swagger文档 swaggerDoc = await fetchSwaggerFromUrl( swaggerUrl, config.swagger.urlTimeout ); // 可选:缓存远程文档到本地 if (config.swagger.cacheRemoteSpec) { try { const cacheFileName = `swagger-cache-${Date.now()}.json`; const cachePath = path.join(__dirname, "..", cacheFileName); fs.writeFileSync(cachePath, JSON.stringify(swaggerDoc, null, 2)); console.error(`💾 已缓存远程Swagger文档到: ${cacheFileName}`); } catch (cacheError) { console.warn(`⚠️ 无法缓存远程文档: ${cacheError.message}`); } } } else { // 从本地文件读取 const swaggerPath = path.resolve(__dirname, "..", config.swagger.filePath); if (!fs.existsSync(swaggerPath)) { throw new Error(`本地Swagger文件不存在: ${swaggerPath}`); } swaggerDoc = JSON.parse(fs.readFileSync(swaggerPath, "utf-8")); console.error(`📁 从本地文件加载Swagger文档: ${swaggerPath}`); } // 验证文档格式 validateSwaggerDoc(swaggerDoc); console.error( `✅ 成功加载Swagger文档: ${swaggerDoc.info.title} v${ swaggerDoc.info.version || "未知版本" }` ); return swaggerDoc; } /** * 将Swagger参数转换为JSON Schema类型 */ function swaggerTypeToJsonSchema(swaggerParam) { const typeMapping = { string: "string", integer: "number", number: "number", boolean: "boolean", array: "array", object: "object", file: "string", // 文件上传参数作为字符串处理 }; const schema = { type: typeMapping[swaggerParam.type] || "string", }; // 添加描述 if (swaggerParam.description) { schema.description = swaggerParam.description; } // 处理枚举值 if (swaggerParam.enum) { schema.enum = swaggerParam.enum; } // 处理数组类型 if (swaggerParam.type === "array" && swaggerParam.items) { schema.items = swaggerTypeToJsonSchema(swaggerParam.items); } // 处理数字范围 if (swaggerParam.minimum !== undefined) { schema.minimum = swaggerParam.minimum; } if (swaggerParam.maximum !== undefined) { schema.maximum = swaggerParam.maximum; } // 处理字符串长度 if (swaggerParam.minLength !== undefined) { schema.minLength = swaggerParam.minLength; } if (swaggerParam.maxLength !== undefined) { schema.maxLength = swaggerParam.maxLength; } return schema; } /** * 创建统一的API调用工具 */ function createUnifiedApiTools() { const swaggerDoc = global.swaggerDoc; return [ { name: "api_call", description: `调用${swaggerDoc.info.title} API的通用工具。支持所有HTTP方法和路径。`, inputSchema: { type: "object", properties: { method: { type: "string", enum: ["GET", "POST", "PUT", "DELETE", "PATCH", "HEAD", "OPTIONS"], description: "HTTP方法", }, path: { type: "string", description: "API路径,例如: /users/{id} 或 /posts", }, pathParams: { type: "object", description: '路径参数,例如: {"id": "123"}', additionalProperties: true, }, queryParams: { type: "object", description: '查询参数,例如: {"limit": 10, "offset": 0}', additionalProperties: true, }, headers: { type: "object", description: '请求头,例如: {"Authorization": "Bearer token"}', additionalProperties: true, }, body: { type: "object", description: "请求体数据(用于POST/PUT等方法)", additionalProperties: true, }, }, required: ["method", "path"], }, }, { name: "api_list_endpoints", description: `列出${swaggerDoc.info.title} API的所有可用端点`, inputSchema: { type: "object", properties: { filter: { type: "string", description: "过滤端点的关键词(可选)", }, method: { type: "string", enum: ["GET", "POST", "PUT", "DELETE", "PATCH", "HEAD", "OPTIONS"], description: "按HTTP方法过滤(可选)", }, }, }, }, { name: "api_get_endpoint_info", description: `获取${swaggerDoc.info.title} API特定端点的详细信息`, inputSchema: { type: "object", properties: { method: { type: "string", enum: ["GET", "POST", "PUT", "DELETE", "PATCH", "HEAD", "OPTIONS"], description: "HTTP方法", }, path: { type: "string", description: "API路径", }, }, required: ["method", "path"], }, }, ]; } /** * 构建统一API调用的请求 */ function buildUnifiedApiRequest( method, path, pathParams = {}, queryParams = {}, headers = {}, body = null ) { const baseUrl = global.baseUrl; // 处理路径参数 let url = baseUrl + path; for (const [key, value] of Object.entries(pathParams)) { url = url.replace(`{${key}}`, encodeURIComponent(value)); } // 处理查询参数 const query = new URLSearchParams(); for (const [key, value] of Object.entries(queryParams)) { if (value !== undefined && value !== null) { query.append(key, value); } } if (query.toString()) { url += "?" + query.toString(); } // 处理请求头 const requestHeaders = { "User-Agent": config.http.userAgent, ...headers, }; // 处理请求体 let requestBody = null; if (body && (method === "POST" || method === "PUT" || method === "PATCH")) { requestBody = typeof body === "string" ? body : JSON.stringify(body); if (!requestHeaders["Content-Type"]) { requestHeaders["Content-Type"] = "application/json"; } } return { url, headers: requestHeaders, body: requestBody }; } /** * 列出所有API端点 */ function listApiEndpoints(filter = "", methodFilter = "") { const swaggerDoc = global.swaggerDoc; const endpoints = []; for (const [path, methods] of Object.entries(swaggerDoc.paths)) { for (const [method, operation] of Object.entries(methods)) { if (typeof operation !== "object" || !operation) continue; const endpoint = { method: method.toUpperCase(), path: path, summary: operation.summary || "", description: operation.description || "", operationId: operation.operationId || "", tags: operation.tags || [], }; // 应用过滤器 if ( filter && !JSON.stringify(endpoint).toLowerCase().includes(filter.toLowerCase()) ) { continue; } if (methodFilter && endpoint.method !== methodFilter.toUpperCase()) { continue; } endpoints.push(endpoint); } } return endpoints; } /** * 获取特定端点的详细信息 */ function getEndpointInfo(method, path) { const swaggerDoc = global.swaggerDoc; if ( !swaggerDoc.paths[path] || !swaggerDoc.paths[path][method.toLowerCase()] ) { throw new Error(`端点 ${method.toUpperCase()} ${path} 不存在`); } const operation = swaggerDoc.paths[path][method.toLowerCase()]; const params = operation.parameters || []; return { method: method.toUpperCase(), path: path, summary: operation.summary || "", description: operation.description || "", operationId: operation.operationId || "", tags: operation.tags || [], parameters: params.map((param) => ({ name: param.name, in: param.in, type: param.type, required: param.required || false, description: param.description || "", })), responses: operation.responses || {}, }; } /** * 延迟函数 */ function delay(ms) { return new Promise((resolve) => setTimeout(resolve, ms)); } /** * 验证必需参数 */ function validateRequiredParameters(operation, args) { const params = operation.parameters || []; const missingParams = []; for (const param of params) { if ( param.required && (args[param.name] === undefined || args[param.name] === null) ) { missingParams.push(param.name); } } if (missingParams.length > 0) { throw new McpError( ErrorCode.InvalidParams, `缺少必需参数: ${missingParams.join(", ")}` ); } } /** * 执行统一API调用 */ async function executeUnifiedApiCall(args) { const { method, path, pathParams = {}, queryParams = {}, headers = {}, body, } = args; if (!method || !path) { throw new McpError(ErrorCode.InvalidParams, "method和path参数是必需的"); } let lastError; for (let attempt = 1; attempt <= config.http.maxRetries; attempt++) { try { const { url, headers: requestHeaders, body: requestBody, } = buildUnifiedApiRequest( method, path, pathParams, queryParams, headers, body ); if (config.logging.enableConsole) { console.error(`🚀 发起API调用: ${method.toUpperCase()} ${url}`); } const controller = new AbortController(); const timeoutId = setTimeout( () => controller.abort(), config.http.timeout ); try { const response = await fetch(url, { method: method.toUpperCase(), headers: requestHeaders, body: requestBody, signal: controller.signal, }); clearTimeout(timeoutId); const contentType = response.headers.get("content-type"); let responseData; if (contentType && contentType.includes("application/json")) { responseData = await response.json(); } else { responseData = await response.text(); } if (config.logging.enableConsole) { console.error( `✅ API调用成功: ${response.status} ${response.statusText}` ); } return { content: [ { type: "text", text: `状态码: ${response.status} ${ response.statusText }\n内容类型: ${ contentType || "unknown" }\n响应数据: ${JSON.stringify(responseData, null, 2)}`, }, ], }; } catch (fetchError) { clearTimeout(timeoutId); if (fetchError.name === "AbortError") { throw new Error(`请求超时 (${config.http.timeout}ms)`); } throw fetchError; } } catch (error) { lastError = error; if (config.logging.enableConsole) { console.error( `🔄 API调用失败 (尝试 ${attempt}/${config.http.maxRetries}):`, error.message ); } if (attempt === config.http.maxRetries || error instanceof McpError) { break; } await delay(config.http.retryDelay * attempt); } } if (lastError instanceof McpError) { throw lastError; } throw new McpError( ErrorCode.InternalError, `API调用失败: ${lastError.message}` ); } /** * 列出端点 */ async function listEndpoints(args) { const { filter = "", method = "" } = args || {}; try { const endpoints = listApiEndpoints(filter, method); return { content: [ { type: "text", text: `找到 ${endpoints.length} 个API端点:\n\n${endpoints .map( (ep) => `${ep.method} ${ep.path}\n ${ ep.summary || ep.description || "无描述" }\n 标签: ${ep.tags.join(", ") || "无"}` ) .join("\n\n")}`, }, ], }; } catch (error) { throw new McpError( ErrorCode.InternalError, `获取端点列表失败: ${error.message}` ); } } /** * 获取端点详情 */ async function getEndpointDetails(args) { const { method, path } = args || {}; if (!method || !path) { throw new McpError(ErrorCode.InvalidParams, "method和path参数是必需的"); } try { const info = getEndpointInfo(method, path); const paramInfo = info.parameters .map( (p) => ` - ${p.name} (${p.in}): ${p.type} ${ p.required ? "[必需]" : "[可选]" }\n ${p.description || "无描述"}` ) .join("\n"); return { content: [ { type: "text", text: `端点详情: ${info.method} ${info.path}\n\n` + `摘要: ${info.summary}\n` + `描述: ${info.description}\n` + `操作ID: ${info.operationId}\n` + `标签: ${info.tags.join(", ")}\n\n` + `参数:\n${paramInfo || " 无参数"}\n\n` + `响应: ${JSON.stringify(info.responses, null, 2)}`, }, ], }; } catch (error) { throw new McpError( ErrorCode.InternalError, `获取端点详情失败: ${error.message}` ); } } /** * 执行单次API调用 */ async function performApiCall(toolName, args) { try { const swaggerDoc = global.swaggerDoc; // 解析工具名称,提取HTTP方法和路径 const parts = toolName.split("_"); const method = parts[0].toLowerCase(); // 找到对应的路径 let targetPath = null; let targetOperation = null; for (const [path, methods] of Object.entries(swaggerDoc.paths)) { if (methods[method]) { const pathToolName = `${method.toUpperCase()}_${path .replace(/[{}]/g, "") .replace(/\//g, "_")}`; if (pathToolName === toolName) { targetPath = path; targetOperation = methods[method]; break; } } } if (!targetPath || !targetOperation) { throw new McpError(ErrorCode.InvalidRequest, `未找到工具: ${toolName}`); } // 验证必需参数 validateRequiredParameters(targetOperation, args); // 构建请求 const { url, headers, body } = buildRequestUrl(targetPath, method, args); // 设置默认请求头 const requestHeaders = { "User-Agent": config.http.userAgent, ...headers, }; if (config.logging.enableConsole) { console.error(`🚀 发起API调用: ${method.toUpperCase()} ${url}`); } // 创建AbortController用于超时控制 const controller = new AbortController(); const timeoutId = setTimeout(() => controller.abort(), config.http.timeout); try { // 执行请求 const response = await fetch(url, { method: method.toUpperCase(), headers: requestHeaders, body: body, signal: controller.signal, }); clearTimeout(timeoutId); // 处理响应 const contentType = response.headers.get("content-type"); let responseData; if (contentType && contentType.includes("application/json")) { responseData = await response.json(); } else { responseData = await response.text(); } // 记录成功的响应 if (config.logging.enableConsole) { console.error( `✅ API调用成功: ${response.status} ${response.statusText}` ); } // 返回结果 return { content: [ { type: "text", text: `状态码: ${response.status} ${ response.statusText }\n内容类型: ${ contentType || "unknown" }\n响应数据: ${JSON.stringify(responseData, null, 2)}`, }, ], }; } catch (fetchError) { clearTimeout(timeoutId); if (fetchError.name === "AbortError") { throw new Error(`请求超时 (${config.http.timeout}ms)`); } throw fetchError; } } catch (error) { if (error instanceof McpError) { throw error; } throw new McpError( ErrorCode.InternalError, `API调用失败: ${error.message}` ); } } // 创建MCP服务器 const server = new Server( { name: config.server.name, version: config.server.version, }, { capabilities: { tools: {}, }, } ); // 注册工具列表处理器 server.setRequestHandler(ListToolsRequestSchema, async () => { const tools = createUnifiedApiTools(); return { tools: tools, }; }); // 注册工具调用处理器 server.setRequestHandler(CallToolRequestSchema, async (request) => { const { name, arguments: args } = request.params; try { switch (name) { case "api_call": return await executeUnifiedApiCall(args); case "api_list_endpoints": return await listEndpoints(args); case "api_get_endpoint_info": return await getEndpointDetails(args); default: throw new McpError(ErrorCode.InvalidRequest, `未知工具: ${name}`); } } catch (error) { if (error instanceof McpError) { throw error; } throw new McpError(ErrorCode.InternalError, error.message); } }); // 启动服务器 async function main() { // 加载swagger.json文件 let swaggerDoc; try { swaggerDoc = await loadSwaggerDoc(); } catch (error) { console.error("❌ 无法读取或验证Swagger文档:", error.message); console.error("\n💡 使用提示:"); console.error(" 本地文件: node src/index.js"); console.error( " 远程URL: node src/index.js --url https://example.com/swagger.json" ); console.error( " 或者: node src/index.js --swagger https://example.com/api-docs" ); process.exit(1); } // 构建基础URL const baseUrl = `${swaggerDoc.schemes?.[0] || "https"}://${swaggerDoc.host}${ swaggerDoc.basePath || "" }`; // 将swaggerDoc设置为全局变量,供其他函数使用 global.swaggerDoc = swaggerDoc; global.baseUrl = baseUrl; const transport = new StdioServerTransport(); await server.connect(transport); const tools = createUnifiedApiTools(); const totalEndpoints = Object.keys(swaggerDoc.paths).reduce((count, path) => { return count + Object.keys(swaggerDoc.paths[path]).length; }, 0); if (config.logging.enableConsole) { console.error("🎉 Swagger MCP服务器已启动"); console.error(`📡 基础URL: ${baseUrl}`); console.error(`🔧 MCP工具数量: ${tools.length} (统一API调用工具)`); console.error(`📊 API端点数量: ${totalEndpoints} 个`); console.error( `⚙️ 配置: 超时${config.http.timeout}ms, 重试${config.http.maxRetries}次` ); // 显示数据源信息 const swaggerUrl = cmdArgs.url || cmdArgs.swagger; if (swaggerUrl) { console.error(`🌐 数据源: 远程URL - ${swaggerUrl}`); } else { console.error(`📁 数据源: 本地文件 - ${config.swagger.filePath}`); } // 列出所有可用工具 console.error("\n📋 可用MCP工具:"); tools.forEach((tool, index) => { console.error(` ${index + 1}. ${tool.name} - ${tool.description}`); }); console.error( `\n💡 提示: 使用 api_list_endpoints 工具查看所有 ${totalEndpoints} 个API端点` ); } } main().catch((error) => { console.error("服务器启动失败:", error); process.exit(1); });

Implementation Reference

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/zidong0822/swagger-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server