Skip to main content
Glama
index.js15.3 kB
#!/usr/bin/env node import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js"; import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js"; import { z } from "zod"; import { SupersetApiService } from "./services/superset-api.js"; import dotenv from "dotenv"; // 加载环境变量 dotenv.config(); // Superset API 配置 const SUPERSET_URL = process.env.SUPERSET_URL || ""; const SUPERSET_USERNAME = process.env.SUPERSET_USERNAME || ""; const SUPERSET_PASSWORD = process.env.SUPERSET_PASSWORD || ""; // 创建 Superset API 服务实例 const supersetApi = new SupersetApiService({ baseUrl: SUPERSET_URL, username: SUPERSET_USERNAME, password: SUPERSET_PASSWORD, withCredentials: true, }); // 创建服务器实例 const server = new McpServer({ name: "superset-query", version: "1.0.0", }); // 缓存数据库和表信息 let databasesCache = []; let tablesCache = new Map(); let fieldsCache = new Map(); // key: `${databaseId}:${schema}:${tableName}` // 初始化缓存 async function initializeCache() { try { console.log("初始化数据库和表缓存..."); // 获取所有数据库 databasesCache = await supersetApi.getDatabases(); console.log(`缓存了 ${databasesCache.length} 个数据库`); // 获取每个数据库的表 for (const db of databasesCache) { try { const schemas = await supersetApi.getSchemas(db.id); if (schemas.length === 0) { console.log(`数据库 ${db.database_name} (ID: ${db.id}) 没有找到schema`); continue; } const tables = []; for (const schema of schemas) { tables.push(...await supersetApi.getTables(db.id, schema)); } tablesCache.set(db.id, tables); console.log(`缓存了数据库 ${db.database_name} (ID: ${db.id}) 的 ${tables.length} 个表`); } catch (error) { console.error(`获取数据库 ${db.database_name} (ID: ${db.id}) 的表失败:`, error); } } console.log("缓存初始化完成"); } catch (error) { console.error("初始化缓存失败:", error); } } // 获取表的字段信息并缓存 async function getTableFields(databaseId, schema, tableName) { const cacheKey = `${databaseId}:${schema}:${tableName}`; if (fieldsCache.has(cacheKey)) { return fieldsCache.get(cacheKey) || []; } try { const metadata = await supersetApi.getTableMetadata(databaseId, schema, tableName); fieldsCache.set(cacheKey, metadata.columns); return metadata.columns; } catch (error) { console.error(`获取表 ${schema}.${tableName} 的字段失败:`, error); return []; } } // 根据自然语言查询找到最匹配的表 function findMatchingTable(query) { // 简单的匹配算法,可以根据需要改进 for (const db of databasesCache) { const tables = tablesCache.get(db.id) || []; for (const table of tables) { // 检查表名是否在查询中出现 if (query.toLowerCase().includes(table.name.toLowerCase())) { return { database: db, table }; } } } // 如果没有找到匹配的表,返回第一个数据库的第一个表(如果有) if (databasesCache.length > 0) { const firstDb = databasesCache[0]; const firstDbTables = tablesCache.get(firstDb.id) || []; if (firstDbTables.length > 0) { return { database: firstDb, table: firstDbTables[0] }; } } return null; } // 根据自然语言查询和表信息生成 SQL 查询 function generateSqlQuery(query, table, fields) { // 基本的 SQL 生成,可以根据需要改进 const fieldNames = fields.map(f => `"${f.name}"`).join(", "); // 提取可能的过滤条件 let whereClause = ""; const keywords = ["where", "filter", "条件", "筛选"]; for (const keyword of keywords) { const keywordIndex = query.toLowerCase().indexOf(keyword); if (keywordIndex !== -1) { const condition = query.substring(keywordIndex + keyword.length).trim(); if (condition) { // 尝试从条件中提取字段名和值 for (const field of fields) { if (condition.toLowerCase().includes(field.name.toLowerCase())) { // 简单的条件提取,实际应用中可能需要更复杂的解析 whereClause = `WHERE "${field.name}" LIKE '%${condition.replace(/['"]/g, "")}%'`; break; } } } } } // 提取可能的限制行数 let limitClause = "LIMIT 10"; // 默认限制 const limitKeywords = ["limit", "top", "限制", "前"]; for (const keyword of limitKeywords) { const keywordIndex = query.toLowerCase().indexOf(keyword); if (keywordIndex !== -1) { const limitText = query.substring(keywordIndex + keyword.length).trim().split(/\s+/)[0]; const limit = parseInt(limitText); if (!isNaN(limit) && limit > 0) { limitClause = `LIMIT ${limit}`; } } } return `SELECT ${fieldNames} FROM "${table.schema}"."${table.name}" ${whereClause} ${limitClause}`; } // 注册 Superset 查询工具 server.tool("query-superset", "执行 Superset 数据查询", { query: z.string().describe("用户的自然语言查询,例如'查询最近10条日志'"), databaseId: z.number().optional().describe("可选的数据库ID,如果不提供将自动选择匹配的数据库"), schema: z.string().optional().describe("可选的schema名称"), tableName: z.string().optional().describe("可选的表名"), }, async ({ query, databaseId, schema, tableName }) => { try { // 如果缓存为空,初始化缓存 if (databasesCache.length === 0) { initializeCache(); return { content: [ { type: "text", text: "初始化缓存中...", }, ], }; } let selectedDb; let selectedTable; // 如果提供了明确的数据库ID和表名 if (databaseId !== undefined && schema && tableName) { selectedDb = databasesCache.find(db => db.id === databaseId); if (selectedDb) { const tables = tablesCache.get(selectedDb.id) || []; selectedTable = tables.find(t => t.schema === schema && t.name === tableName); } } else { // 否则,根据查询自动选择匹配的表 const match = findMatchingTable(query); if (match) { selectedDb = match.database; selectedTable = match.table; } } if (!selectedDb || !selectedTable) { return { content: [ { type: "text", text: "无法找到匹配的数据库或表。请提供更具体的查询或明确指定数据库ID和表名。", }, ], }; } // 获取表的字段信息 const fields = await getTableFields(selectedDb.id, selectedTable.schema, selectedTable.name); if (fields.length === 0) { return { content: [ { type: "text", text: `无法获取表 ${selectedTable.schema}.${selectedTable.name} 的字段信息。`, }, ], }; } // 生成 SQL 查询 const sql = generateSqlQuery(query, selectedTable, fields); // 执行查询 const queryRequest = { database_id: selectedDb.id, sql, schema: selectedTable.schema, client_id: `mcp_client_${Date.now()}`, sql_editor_id: `mcp_editor_${Date.now()}`, runAsync: false, json: true, }; console.log(`执行查询: ${sql}`); const queryResult = await supersetApi.executeQuery(queryRequest); // 处理查询结果 if (queryResult.status === "success") { // 格式化查询结果 const resultText = `查询结果 (${queryResult.data?.length || 0} 行):\n\n` + JSON.stringify(queryResult.data, null, 2); return { content: [ { type: "text", text: `数据库: ${selectedDb.database_name}\n表: ${selectedTable.schema}.${selectedTable.name}\n\nSQL: ${sql}\n\n${resultText}`, }, ], }; } else if (queryResult.status === "running" && queryResult.query_id) { // 异步查询,获取结果 const results = await supersetApi.getQueryResults(queryResult.query_id); const resultText = `查询结果 (${results.data?.length || 0} 行):\n\n` + JSON.stringify(results.data, null, 2); return { content: [ { type: "text", text: `数据库: ${selectedDb.database_name}\n表: ${selectedTable.schema}.${selectedTable.name}\n\nSQL: ${sql}\n\n${resultText}`, }, ], }; } else { return { content: [ { type: "text", text: `查询失败: ${queryResult.error?.message || "未知错误"}\n\nSQL: ${sql}`, }, ], }; } } catch (error) { console.error("执行查询时发生错误:", error); return { content: [ { type: "text", text: `执行查询时发生错误: ${error.message}`, }, ], }; } }); // 注册获取数据库列表工具 server.tool("list-databases", "获取所有可用的数据库列表", {}, async () => { try { // 如果缓存为空,初始化缓存 if (databasesCache.length === 0) { initializeCache(); return { content: [ { type: "text", text: "初始化缓存中...", }, ], }; } const databasesList = databasesCache.map(db => `ID: ${db.id}, 名称: ${db.database_name}`).join("\n"); return { content: [ { type: "text", text: `可用数据库列表:\n\n${databasesList}`, }, ], }; } catch (error) { console.error("获取数据库列表失败:", error); return { content: [ { type: "text", text: `获取数据库列表失败: ${error.message}`, }, ], }; } }); // 注册获取表列表工具 server.tool("list-tables", "获取指定数据库的表列表", { databaseId: z.number().describe("数据库ID"), schema: z.string().describe("Schema名称").optional(), }, async ({ databaseId, schema }) => { try { // 如果缓存为空,初始化缓存 if (databasesCache.length === 0) { initializeCache(); return { content: [ { type: "text", text: "初始化缓存中...", }, ], }; } const database = databasesCache.find(db => db.id === databaseId); if (!database) { return { content: [ { type: "text", text: `找不到ID为 ${databaseId} 的数据库`, }, ], }; } // 获取表列表 let tables = tablesCache.get(databaseId); // 如果提供了schema,则过滤表 if (tables && schema) { tables = tables.filter(table => table.schema === schema); } if (!tables || tables.length === 0) { // 如果缓存中没有,尝试重新获取 return { content: [ { type: "text", text: `数据库 ${database.database_name} 中没有找到表`, }, ], }; } const tablesList = tables.map(table => `Schema: ${table.schema}, 表名: ${table.name}`).join("\n"); return { content: [ { type: "text", text: `数据库 ${database.database_name} 的表列表:\n\n${tablesList}`, }, ], }; } catch (error) { console.error("获取表列表失败:", error); return { content: [ { type: "text", text: `获取表列表失败: ${error.message}`, }, ], }; } }); // 注册获取表字段工具 server.tool("list-fields", "获取指定表的字段列表", { databaseId: z.number().describe("数据库ID"), schema: z.string().describe("Schema名称"), tableName: z.string().describe("表名"), }, async ({ databaseId, schema, tableName }) => { try { const fields = await getTableFields(databaseId, schema, tableName); if (fields.length === 0) { return { content: [ { type: "text", text: `表 ${schema}.${tableName} 中没有找到字段`, }, ], }; } const fieldsList = fields.map(field => `名称: ${field.name}, 类型: ${field.type}`).join("\n"); return { content: [ { type: "text", text: `表 ${schema}.${tableName} 的字段列表:\n\n${fieldsList}`, }, ], }; } catch (error) { console.error("获取字段列表失败:", error); return { content: [ { type: "text", text: `获取字段列表失败: ${error.message}`, }, ], }; } }); // 初始化缓存 initializeCache().catch(error => { console.error("初始化缓存失败:", error); }); async function main() { const transport = new StdioServerTransport(); await server.connect(transport); } main().catch((error) => { console.error("Fatal error in main():", error); process.exit(1); }); //# sourceMappingURL=index.js.map

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/LiusCraft/superset-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server