import {McpServer} from "@modelcontextprotocol/sdk/server/mcp.js";
import {StdioServerTransport} from "@modelcontextprotocol/sdk/server/stdio.js";
import {z} from "zod";
import mysql from "mysql2/promise";
const databaseUrl = process.env.DATABASE_URL;
if (!databaseUrl) {
console.error("DATABASE_URL environment variable is not set.");
process.exit(1);
}
let pool: mysql.Pool;
try {
pool = mysql.createPool(databaseUrl);
console.log("Connected to MySQL database.");
} catch (error) {
console.error("Failed to connect to MySQL database:", error);
process.exit(1);
}
// Helper to sanitize table names to prevent basic SQL injection for DESCRIBE/SHOW commands
const sanitizeTableName = (name: string): string => {
if (!/^[a-zA-Z0-9_]+$/.test(name)) {
throw new Error("Invalid table name. Table names must be alphanumeric with underscores.");
}
return name;
};
const server = new McpServer({name: "db-server", version: "1.0.0"});
server.registerTool(
"list_table",
{
title: "list_tables",
description: "Lists all tables in the connected MySQL database.",
inputSchema: z.object({}), // 必须是 zod schema
// outputSchema: z.array(z.string()), // 定义输出模式
},
async () => {
try {
// 1. 执行 SQL
const [rows] = await pool.execute(`SHOW TABLES`);
// 2. 数据处理:提取表名
// mysql2 的 rows 返回的是 [{ "Tables_in_dbname": "table1" }, ...]
// 我们需要提取 value 组成纯数组: ["table1", "table2"]
const tableNames = Array.isArray(rows)
? rows.map((row) => Object.values(row)[0])
: [];
// 3. 返回 MCP 标准格式
// MCP 要求返回结构必须是: { content: [ { type: "text", text: "..." } ] }
return {
content: [
{
type: "text",
text: JSON.stringify(tableNames, null, 2) // 将数组转为字符串返回给 LLM
}
]
};
} catch (error) {
console.error("Error listing tables:", error);
return {
content: [
{
type: "text",
text: `Error listing tables: ${error instanceof Error ? error.message : String(error)}`
}
],
isError: true
};
}
}
)
server.registerTool(
"describe_table",
{
title: "describe_table",
description: "Describes the columns of a specified table in the MySQL database.",
inputSchema: z.object({
tableName: z.string().describe("The name of the table to describe."),
}),
// ❌ 移除 outputSchema,防止 "Output validation error"
// outputSchema: ...
},
async (args) => {
// 从参数对象中解构 tableName
const { tableName } = args;
try {
// 注意:DESCRIBE 语句通常不支持参数化查询(?),直接拼接表名需要注意 SQL 注入风险
// 如果你有 sanitizeTableName 函数请继续使用,这里我做了简单的防注入处理(仅允许字母数字下划线)或者加反引号
// 简单验证表名格式(可选,根据实际情况调整)
// if (!/^[a-zA-Z0-9_]+$/.test(tableName)) {
// throw new Error("Invalid table name provided.");
// }
const [rows] = await pool.execute(`DESCRIBE \`${tableName}\``);
// ✅ 返回 MCP 标准格式
return {
content: [
{
type: "text",
text: JSON.stringify(rows, null, 2)
}
]
};
} catch (error) {
console.error(`Error describing table ${tableName}:`, error);
// ✅ 出错时返回错误信息给 LLM
return {
content: [
{
type: "text",
text: `Error describing table '${tableName}': ${error instanceof Error ? error.message : String(error)}`
}
],
isError: true
};
}
}
);
server.registerTool(
"execute_read_query",
{
title: "execute_read_query",
description: "Executes a read-only SQL query against the MySQL database. ONLY SELECT, SHOW, DESCRIBE statements are allowed to prevent write operations.",
inputSchema: z.object({
query: z.string().describe("The SQL query to execute."),
params: z.array(z.any()).optional().describe("Optional parameters for the query."),
}),
// ❌ 移除 outputSchema,避免校验错误
},
async (args) => {
// 给 params 设置默认空数组,防止 undefined
const { query, params = [] } = args;
// 1. 安全检查
const lowerQuery = query.toLowerCase().trim();
if (!lowerQuery.startsWith("select") &&
!lowerQuery.startsWith("show") &&
!lowerQuery.startsWith("describe")) {
return {
content: [{
type: "text",
text: "Error: Only SELECT, SHOW, and DESCRIBE queries are allowed in read-only mode."
}],
isError: true
};
}
try {
// 2. 执行查询
const [rows] = await pool.execute(query, params);
// 3. 返回 MCP 标准格式
return {
content: [
{
type: "text",
// JSON.stringify 第二个参数用于处理 MySQL 可能返回的 BigInt 类型,防止报错
text: JSON.stringify(rows, (key, value) =>
typeof value === 'bigint' ? value.toString() : value
, 2)
}
]
};
} catch (error) {
console.error("Error executing query:", error);
// 4. 错误处理
return {
content: [
{
type: "text",
text: `Failed to execute query: ${error instanceof Error ? error.message : String(error)}`
}
],
isError: true
};
}
}
);
// Connect to transport
const transport = new StdioServerTransport();
await server.connect(transport);
console.log("MCP Server initialized and listening for commands.");