Metabase MCP Server
by imlewc
- src
#!/usr/bin/env node
// 为老版本 Node.js 添加 AbortController polyfill
import AbortController from 'abort-controller';
global.AbortController = global.AbortController || AbortController;
/**
* Metabase MCP 服务器
* 实现与 Metabase API 的交互,提供以下功能:
* - 获取仪表板列表
* - 获取问题列表
* - 获取数据库列表
* - 执行问题查询
* - 获取仪表板详情
*/
import { Server } from "@modelcontextprotocol/sdk/server/index.js";
import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js";
import {
ListResourcesRequestSchema,
ReadResourceRequestSchema,
CallToolRequestSchema,
ListResourcesResult,
ReadResourceResult,
ResourceSchema,
ToolSchema
} from "@modelcontextprotocol/sdk/types.js";
import { z } from "zod";
import axios, { AxiosInstance } from "axios";
// 自定义错误枚举
enum ErrorCode {
InternalError = "internal_error",
InvalidRequest = "invalid_request",
InvalidParams = "invalid_params",
MethodNotFound = "method_not_found"
}
// 自定义错误类
class McpError extends Error {
code: ErrorCode;
constructor(code: ErrorCode, message: string) {
super(message);
this.code = code;
this.name = "McpError";
}
}
// 从环境变量获取 Metabase 配置
const METABASE_URL = process.env.METABASE_URL;
const METABASE_USERNAME = process.env.METABASE_USERNAME;
const METABASE_PASSWORD = process.env.METABASE_PASSWORD;
if (!METABASE_URL || !METABASE_USERNAME || !METABASE_PASSWORD) {
throw new Error("METABASE_URL, METABASE_USERNAME, and METABASE_PASSWORD environment variables are required");
}
// 创建自定义 Schema 对象,使用 z.object
const ListResourceTemplatesRequestSchema = z.object({
method: z.literal("resources/list_templates")
});
const ListToolsRequestSchema = z.object({
method: z.literal("tools/list")
});
class MetabaseServer {
private server: Server;
private axiosInstance: AxiosInstance;
private sessionToken: string | null = null;
constructor() {
this.server = new Server(
{
name: "metabase-server",
version: "0.1.0",
},
{
capabilities: {
resources: {},
tools: {},
},
}
);
this.axiosInstance = axios.create({
baseURL: METABASE_URL,
headers: {
"Content-Type": "application/json",
},
});
this.setupResourceHandlers();
this.setupToolHandlers();
// Enhanced error handling with logging
this.server.onerror = (error: Error) => {
this.logError('Server Error', error);
};
process.on('SIGINT', async () => {
this.logInfo('Shutting down server...');
await this.server.close();
process.exit(0);
});
}
// Add logging utilities
private logInfo(message: string, data?: unknown) {
const logMessage = {
timestamp: new Date().toISOString(),
level: 'info',
message,
data
};
console.error(JSON.stringify(logMessage));
// MCP SDK changed, can't directly access session
try {
// Use current session if available
console.error(`INFO: ${message}`);
} catch (e) {
// Ignore if session not available
}
}
private logError(message: string, error: unknown) {
const errorObj = error as Error;
const apiError = error as { response?: { data?: { message?: string } }, message?: string };
const logMessage = {
timestamp: new Date().toISOString(),
level: 'error',
message,
error: errorObj.message || 'Unknown error',
stack: errorObj.stack
};
console.error(JSON.stringify(logMessage));
// MCP SDK changed, can't directly access session
try {
console.error(`ERROR: ${message} - ${errorObj.message || 'Unknown error'}`);
} catch (e) {
// Ignore if session not available
}
}
/**
* 获取 Metabase 会话令牌
*/
private async getSessionToken(): Promise<string> {
if (this.sessionToken) {
return this.sessionToken;
}
this.logInfo('Authenticating with Metabase...');
try {
const response = await this.axiosInstance.post('/api/session', {
username: METABASE_USERNAME,
password: METABASE_PASSWORD,
});
this.sessionToken = response.data.id;
// 设置默认请求头
this.axiosInstance.defaults.headers.common['X-Metabase-Session'] = this.sessionToken;
this.logInfo('Successfully authenticated with Metabase');
return this.sessionToken as string;
} catch (error) {
this.logError('Authentication failed', error);
throw new McpError(
ErrorCode.InternalError,
'Failed to authenticate with Metabase'
);
}
}
/**
* 设置资源处理程序
*/
private setupResourceHandlers() {
this.server.setRequestHandler(ListResourcesRequestSchema, async (request) => {
this.logInfo('Listing resources...', { requestStructure: JSON.stringify(request) });
await this.getSessionToken();
try {
// 获取仪表板列表
const dashboardsResponse = await this.axiosInstance.get('/api/dashboard');
this.logInfo('Successfully listed resources', { count: dashboardsResponse.data.length });
// 将仪表板作为资源返回
return {
resources: dashboardsResponse.data.map((dashboard: any) => ({
uri: `metabase://dashboard/${dashboard.id}`,
mimeType: "application/json",
name: dashboard.name,
description: `Metabase dashboard: ${dashboard.name}`
}))
};
} catch (error) {
this.logError('Failed to list resources', error);
throw new McpError(
ErrorCode.InternalError,
'Failed to list Metabase resources'
);
}
});
// 资源模板
this.server.setRequestHandler(ListResourceTemplatesRequestSchema, async () => {
return {
resourceTemplates: [
{
uriTemplate: 'metabase://dashboard/{id}',
name: 'Dashboard by ID',
mimeType: 'application/json',
description: 'Get a Metabase dashboard by its ID',
},
{
uriTemplate: 'metabase://card/{id}',
name: 'Card by ID',
mimeType: 'application/json',
description: 'Get a Metabase question/card by its ID',
},
{
uriTemplate: 'metabase://database/{id}',
name: 'Database by ID',
mimeType: 'application/json',
description: 'Get a Metabase database by its ID',
},
],
};
});
// 读取资源
this.server.setRequestHandler(ReadResourceRequestSchema, async (request) => {
this.logInfo('Reading resource...', { requestStructure: JSON.stringify(request) });
await this.getSessionToken();
const uri = request.params?.uri;
let match;
try {
// 处理仪表板资源
if ((match = uri.match(/^metabase:\/\/dashboard\/(\d+)$/))) {
const dashboardId = match[1];
const response = await this.axiosInstance.get(`/api/dashboard/${dashboardId}`);
return {
contents: [{
uri: request.params?.uri,
mimeType: "application/json",
text: JSON.stringify(response.data, null, 2)
}]
};
}
// 处理问题/卡片资源
else if ((match = uri.match(/^metabase:\/\/card\/(\d+)$/))) {
const cardId = match[1];
const response = await this.axiosInstance.get(`/api/card/${cardId}`);
return {
contents: [{
uri: request.params?.uri,
mimeType: "application/json",
text: JSON.stringify(response.data, null, 2)
}]
};
}
// 处理数据库资源
else if ((match = uri.match(/^metabase:\/\/database\/(\d+)$/))) {
const databaseId = match[1];
const response = await this.axiosInstance.get(`/api/database/${databaseId}`);
return {
contents: [{
uri: request.params?.uri,
mimeType: "application/json",
text: JSON.stringify(response.data, null, 2)
}]
};
}
else {
throw new McpError(
ErrorCode.InvalidRequest,
`Invalid URI format: ${uri}`
);
}
} catch (error) {
if (axios.isAxiosError(error)) {
throw new McpError(
ErrorCode.InternalError,
`Metabase API error: ${error.response?.data?.message || error.message}`
);
}
throw error;
}
});
}
/**
* 设置工具处理程序
*/
private setupToolHandlers() {
this.server.setRequestHandler(ListToolsRequestSchema, async () => {
return {
tools: [
{
name: "list_dashboards",
description: "List all dashboards in Metabase",
inputSchema: {
type: "object",
properties: {}
}
},
{
name: "list_cards",
description: "List all questions/cards in Metabase",
inputSchema: {
type: "object",
properties: {}
}
},
{
name: "list_databases",
description: "List all databases in Metabase",
inputSchema: {
type: "object",
properties: {}
}
},
{
name: "execute_card",
description: "Execute a Metabase question/card and get results",
inputSchema: {
type: "object",
properties: {
card_id: {
type: "number",
description: "ID of the card/question to execute"
},
parameters: {
type: "object",
description: "Optional parameters for the query"
}
},
required: ["card_id"]
}
},
{
name: "get_dashboard_cards",
description: "Get all cards in a dashboard",
inputSchema: {
type: "object",
properties: {
dashboard_id: {
type: "number",
description: "ID of the dashboard"
}
},
required: ["dashboard_id"]
}
},
{
name: "execute_query",
description: "Execute a SQL query against a Metabase database",
inputSchema: {
type: "object",
properties: {
database_id: {
type: "number",
description: "ID of the database to query"
},
query: {
type: "string",
description: "SQL query to execute"
},
native_parameters: {
type: "array",
description: "Optional parameters for the query",
items: {
type: "object"
}
}
},
required: ["database_id", "query"]
}
}
]
};
});
this.server.setRequestHandler(CallToolRequestSchema, async (request) => {
this.logInfo('Calling tool...', { requestStructure: JSON.stringify(request) });
await this.getSessionToken();
try {
switch (request.params?.name) {
case "list_dashboards": {
const response = await this.axiosInstance.get('/api/dashboard');
return {
content: [{
type: "text",
text: JSON.stringify(response.data, null, 2)
}]
};
}
case "list_cards": {
const response = await this.axiosInstance.get('/api/card');
return {
content: [{
type: "text",
text: JSON.stringify(response.data, null, 2)
}]
};
}
case "list_databases": {
const response = await this.axiosInstance.get('/api/database');
return {
content: [{
type: "text",
text: JSON.stringify(response.data, null, 2)
}]
};
}
case "execute_card": {
const cardId = request.params?.arguments?.card_id;
if (!cardId) {
throw new McpError(
ErrorCode.InvalidParams,
"Card ID is required"
);
}
const parameters = request.params?.arguments?.parameters || {};
const response = await this.axiosInstance.post(`/api/card/${cardId}/query`, { parameters });
return {
content: [{
type: "text",
text: JSON.stringify(response.data, null, 2)
}]
};
}
case "get_dashboard_cards": {
const dashboardId = request.params?.arguments?.dashboard_id;
if (!dashboardId) {
throw new McpError(
ErrorCode.InvalidParams,
"Dashboard ID is required"
);
}
const response = await this.axiosInstance.get(`/api/dashboard/${dashboardId}`);
return {
content: [{
type: "text",
text: JSON.stringify(response.data.cards, null, 2)
}]
};
}
case "execute_query": {
const databaseId = request.params?.arguments?.database_id;
const query = request.params?.arguments?.query;
const nativeParameters = request.params?.arguments?.native_parameters || [];
if (!databaseId) {
throw new McpError(
ErrorCode.InvalidParams,
"Database ID is required"
);
}
if (!query) {
throw new McpError(
ErrorCode.InvalidParams,
"SQL query is required"
);
}
// 构建查询请求体
const queryData = {
type: "native",
native: {
query: query,
template_tags: {}
},
parameters: nativeParameters,
database: databaseId
};
const response = await this.axiosInstance.post('/api/dataset', queryData);
return {
content: [{
type: "text",
text: JSON.stringify(response.data, null, 2)
}]
};
}
default:
return {
content: [
{
type: "text",
text: `Unknown tool: ${request.params?.name}`
}
],
isError: true
};
}
} catch (error) {
if (axios.isAxiosError(error)) {
return {
content: [{
type: "text",
text: `Metabase API error: ${error.response?.data?.message || error.message}`
}],
isError: true
};
}
throw error;
}
});
}
async run() {
try {
this.logInfo('Starting Metabase MCP server...');
const transport = new StdioServerTransport();
await this.server.connect(transport);
this.logInfo('Metabase MCP server running on stdio');
} catch (error) {
this.logError('Failed to start server', error);
throw error;
}
}
}
// Add global error handlers
process.on('uncaughtException', (error: Error) => {
console.error(JSON.stringify({
timestamp: new Date().toISOString(),
level: 'fatal',
message: 'Uncaught Exception',
error: error.message,
stack: error.stack
}));
process.exit(1);
});
process.on('unhandledRejection', (reason: unknown, promise: Promise<unknown>) => {
const errorMessage = reason instanceof Error ? reason.message : String(reason);
console.error(JSON.stringify({
timestamp: new Date().toISOString(),
level: 'fatal',
message: 'Unhandled Rejection',
error: errorMessage
}));
});
const server = new MetabaseServer();
server.run().catch(console.error);