query-superset
Execute natural language queries on Apache Superset databases to retrieve data, specify database, schema, or table if needed.
Instructions
执行 Superset 数据查询
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| databaseId | No | 可选的数据库ID,如果不提供将自动选择匹配的数据库 | |
| query | Yes | 用户的自然语言查询,例如'查询最近10条日志' | |
| schema | No | 可选的schema名称 | |
| tableName | No | 可选的表名 |
Implementation Reference
- src/index.ts:167-295 (handler)The main handler function for the 'query-superset' tool. It processes the natural language query, selects appropriate database and table (or uses provided), fetches fields, generates SQL, executes the query using SupersetApiService, and formats the results.async ({ query, databaseId, schema, tableName }) => { try { // 如果缓存为空,初始化缓存 if (databasesCache.length === 0) { initializeCache(); return { content: [ { type: "text", text: "初始化缓存中...", }, ], }; } let selectedDb: Database | undefined; let selectedTable: Table | undefined; // 如果提供了明确的数据库ID和表名 if (databaseId !== undefined && schema && tableName) { selectedDb = databasesCache.find(db => db.id === databaseId); if (selectedDb) { const tables = tablesCache.get(selectedDb.id) || []; selectedTable = tables.find(t => t.schema === schema && t.name === tableName); } } else { // 否则,根据查询自动选择匹配的表 const match = findMatchingTable(query); if (match) { selectedDb = match.database; selectedTable = match.table; } } if (!selectedDb || !selectedTable) { return { content: [ { type: "text", text: "无法找到匹配的数据库或表。请提供更具体的查询或明确指定数据库ID和表名。", }, ], }; } // 获取表的字段信息 const fields = await getTableFields(selectedDb.id, selectedTable.schema, selectedTable.name); if (fields.length === 0) { return { content: [ { type: "text", text: `无法获取表 ${selectedTable.schema}.${selectedTable.name} 的字段信息。`, }, ], }; } // 生成 SQL 查询 const sql = generateSqlQuery(query, selectedTable, fields); // 执行查询 const queryRequest: QueryRequest = { database_id: selectedDb.id, sql, schema: selectedTable.schema, client_id: `mcp_client_${Date.now()}`, sql_editor_id: `mcp_editor_${Date.now()}`, runAsync: false, json: true, }; console.log(`执行查询: ${sql}`); const queryResult = await supersetApi.executeQuery(queryRequest); // 处理查询结果 if (queryResult.status === "success") { // 格式化查询结果 const resultText = `查询结果 (${queryResult.data?.length || 0} 行):\n\n` + JSON.stringify(queryResult.data, null, 2); return { content: [ { type: "text", text: `数据库: ${selectedDb.database_name}\n表: ${selectedTable.schema}.${selectedTable.name}\n\nSQL: ${sql}\n\n${resultText}`, }, ], }; } else if (queryResult.status === "running" && queryResult.query_id) { // 异步查询,获取结果 const results = await supersetApi.getQueryResults(queryResult.query_id); const resultText = `查询结果 (${results.data?.length || 0} 行):\n\n` + JSON.stringify(results.data, null, 2); return { content: [ { type: "text", text: `数据库: ${selectedDb.database_name}\n表: ${selectedTable.schema}.${selectedTable.name}\n\nSQL: ${sql}\n\n${resultText}`, }, ], }; } else { return { content: [ { type: "text", text: `查询失败: ${queryResult.error?.message || "未知错误"}\n\nSQL: ${sql}`, }, ], }; } } catch (error) { console.error("执行查询时发生错误:", error); return { content: [ { type: "text", text: `执行查询时发生错误: ${(error as Error).message}`, }, ], }; } }
- src/index.ts:162-166 (schema)Input schema definition using Zod for validating tool parameters: query (required string), optional databaseId, schema, tableName.query: z.string().describe("用户的自然语言查询,例如'查询最近10条日志'"), databaseId: z.number().optional().describe("可选的数据库ID,如果不提供将自动选择匹配的数据库"), schema: z.string().optional().describe("可选的schema名称"), tableName: z.string().optional().describe("可选的表名"), },
- src/index.ts:158-296 (registration)Registration of the 'query-superset' tool on the MCP server using server.tool(), including name, description, input schema, and handler function.server.tool( "query-superset", "执行 Superset 数据查询", { query: z.string().describe("用户的自然语言查询,例如'查询最近10条日志'"), databaseId: z.number().optional().describe("可选的数据库ID,如果不提供将自动选择匹配的数据库"), schema: z.string().optional().describe("可选的schema名称"), tableName: z.string().optional().describe("可选的表名"), }, async ({ query, databaseId, schema, tableName }) => { try { // 如果缓存为空,初始化缓存 if (databasesCache.length === 0) { initializeCache(); return { content: [ { type: "text", text: "初始化缓存中...", }, ], }; } let selectedDb: Database | undefined; let selectedTable: Table | undefined; // 如果提供了明确的数据库ID和表名 if (databaseId !== undefined && schema && tableName) { selectedDb = databasesCache.find(db => db.id === databaseId); if (selectedDb) { const tables = tablesCache.get(selectedDb.id) || []; selectedTable = tables.find(t => t.schema === schema && t.name === tableName); } } else { // 否则,根据查询自动选择匹配的表 const match = findMatchingTable(query); if (match) { selectedDb = match.database; selectedTable = match.table; } } if (!selectedDb || !selectedTable) { return { content: [ { type: "text", text: "无法找到匹配的数据库或表。请提供更具体的查询或明确指定数据库ID和表名。", }, ], }; } // 获取表的字段信息 const fields = await getTableFields(selectedDb.id, selectedTable.schema, selectedTable.name); if (fields.length === 0) { return { content: [ { type: "text", text: `无法获取表 ${selectedTable.schema}.${selectedTable.name} 的字段信息。`, }, ], }; } // 生成 SQL 查询 const sql = generateSqlQuery(query, selectedTable, fields); // 执行查询 const queryRequest: QueryRequest = { database_id: selectedDb.id, sql, schema: selectedTable.schema, client_id: `mcp_client_${Date.now()}`, sql_editor_id: `mcp_editor_${Date.now()}`, runAsync: false, json: true, }; console.log(`执行查询: ${sql}`); const queryResult = await supersetApi.executeQuery(queryRequest); // 处理查询结果 if (queryResult.status === "success") { // 格式化查询结果 const resultText = `查询结果 (${queryResult.data?.length || 0} 行):\n\n` + JSON.stringify(queryResult.data, null, 2); return { content: [ { type: "text", text: `数据库: ${selectedDb.database_name}\n表: ${selectedTable.schema}.${selectedTable.name}\n\nSQL: ${sql}\n\n${resultText}`, }, ], }; } else if (queryResult.status === "running" && queryResult.query_id) { // 异步查询,获取结果 const results = await supersetApi.getQueryResults(queryResult.query_id); const resultText = `查询结果 (${results.data?.length || 0} 行):\n\n` + JSON.stringify(results.data, null, 2); return { content: [ { type: "text", text: `数据库: ${selectedDb.database_name}\n表: ${selectedTable.schema}.${selectedTable.name}\n\nSQL: ${sql}\n\n${resultText}`, }, ], }; } else { return { content: [ { type: "text", text: `查询失败: ${queryResult.error?.message || "未知错误"}\n\nSQL: ${sql}`, }, ], }; } } catch (error) { console.error("执行查询时发生错误:", error); return { content: [ { type: "text", text: `执行查询时发生错误: ${(error as Error).message}`, }, ], }; } } );
- src/index.ts:114-155 (helper)Helper function to generate SQL query from natural language input, table info, and fields, including basic WHERE and LIMIT extraction.function generateSqlQuery(query: string, table: Table, fields: Field[]): string { // 基本的 SQL 生成,可以根据需要改进 const fieldNames = fields.map(f => `"${f.name}"`).join(", "); // 提取可能的过滤条件 let whereClause = ""; const keywords = ["where", "filter", "条件", "筛选"]; for (const keyword of keywords) { const keywordIndex = query.toLowerCase().indexOf(keyword); if (keywordIndex !== -1) { const condition = query.substring(keywordIndex + keyword.length).trim(); if (condition) { // 尝试从条件中提取字段名和值 for (const field of fields) { if (condition.toLowerCase().includes(field.name.toLowerCase())) { // 简单的条件提取,实际应用中可能需要更复杂的解析 whereClause = `WHERE "${field.name}" LIKE '%${condition.replace(/['"]/g, "")}%'`; break; } } } } } // 提取可能的限制行数 let limitClause = "LIMIT 10"; // 默认限制 const limitKeywords = ["limit", "top", "限制", "前"]; for (const keyword of limitKeywords) { const keywordIndex = query.toLowerCase().indexOf(keyword); if (keywordIndex !== -1) { const limitText = query.substring(keywordIndex + keyword.length).trim().split(/\s+/)[0]; const limit = parseInt(limitText); if (!isNaN(limit) && limit > 0) { limitClause = `LIMIT ${limit}`; } } } return `SELECT ${fieldNames} FROM "${table.schema}"."${table.name}" ${whereClause} ${limitClause}`; }
- src/services/superset-api.ts:243-270 (helper)Key helper method in SupersetApiService for executing SQL queries against Superset API, used directly in the tool handler.public async executeQuery(request: QueryRequest): Promise<QueryResult> { try { // 确保必要的参数 if (!request.database_id || !request.sql) { throw new Error('执行查询需要提供 database_id 和 sql 参数'); } // 设置默认值 const queryRequest: QueryRequest = { ...request, client_id: request.client_id || `client_${Date.now()}`, sql_editor_id: request.sql_editor_id || `editor_${Date.now()}`, runAsync: request.runAsync !== undefined ? request.runAsync : false, json: true }; const response = await this.client.post<QueryResult>('/api/v1/sqllab/execute/', queryRequest); console.log(response) if (!response.success) { throw new Error(response.error?.message || '执行查询失败'); } return response.data as QueryResult; } catch (error) { console.error('执行查询失败:', error); throw error; } }