Skip to main content
Glama
index.ts6.83 kB
import {McpServer} from "@modelcontextprotocol/sdk/server/mcp.js"; import {StdioServerTransport} from "@modelcontextprotocol/sdk/server/stdio.js"; import {z} from "zod"; import mysql from "mysql2/promise"; const databaseUrl = process.env.DATABASE_URL; if (!databaseUrl) { console.error("DATABASE_URL environment variable is not set."); process.exit(1); } let pool: mysql.Pool; try { pool = mysql.createPool(databaseUrl); console.log("Connected to MySQL database."); } catch (error) { console.error("Failed to connect to MySQL database:", error); process.exit(1); } // Helper to sanitize table names to prevent basic SQL injection for DESCRIBE/SHOW commands const sanitizeTableName = (name: string): string => { if (!/^[a-zA-Z0-9_]+$/.test(name)) { throw new Error("Invalid table name. Table names must be alphanumeric with underscores."); } return name; }; const server = new McpServer({name: "db-server", version: "1.0.0"}); server.registerTool( "list_table", { title: "list_tables", description: "Lists all tables in the connected MySQL database.", inputSchema: z.object({}), // 必须是 zod schema // outputSchema: z.array(z.string()), // 定义输出模式 }, async () => { try { // 1. 执行 SQL const [rows] = await pool.execute(`SHOW TABLES`); // 2. 数据处理:提取表名 // mysql2 的 rows 返回的是 [{ "Tables_in_dbname": "table1" }, ...] // 我们需要提取 value 组成纯数组: ["table1", "table2"] const tableNames = Array.isArray(rows) ? rows.map((row) => Object.values(row)[0]) : []; // 3. 返回 MCP 标准格式 // MCP 要求返回结构必须是: { content: [ { type: "text", text: "..." } ] } return { content: [ { type: "text", text: JSON.stringify(tableNames, null, 2) // 将数组转为字符串返回给 LLM } ] }; } catch (error) { console.error("Error listing tables:", error); return { content: [ { type: "text", text: `Error listing tables: ${error instanceof Error ? error.message : String(error)}` } ], isError: true }; } } ) server.registerTool( "describe_table", { title: "describe_table", description: "Describes the columns of a specified table in the MySQL database.", inputSchema: z.object({ tableName: z.string().describe("The name of the table to describe."), }), // ❌ 移除 outputSchema,防止 "Output validation error" // outputSchema: ... }, async (args) => { // 从参数对象中解构 tableName const { tableName } = args; try { // 注意:DESCRIBE 语句通常不支持参数化查询(?),直接拼接表名需要注意 SQL 注入风险 // 如果你有 sanitizeTableName 函数请继续使用,这里我做了简单的防注入处理(仅允许字母数字下划线)或者加反引号 // 简单验证表名格式(可选,根据实际情况调整) // if (!/^[a-zA-Z0-9_]+$/.test(tableName)) { // throw new Error("Invalid table name provided."); // } const [rows] = await pool.execute(`DESCRIBE \`${tableName}\``); // ✅ 返回 MCP 标准格式 return { content: [ { type: "text", text: JSON.stringify(rows, null, 2) } ] }; } catch (error) { console.error(`Error describing table ${tableName}:`, error); // ✅ 出错时返回错误信息给 LLM return { content: [ { type: "text", text: `Error describing table '${tableName}': ${error instanceof Error ? error.message : String(error)}` } ], isError: true }; } } ); server.registerTool( "execute_read_query", { title: "execute_read_query", description: "Executes a read-only SQL query against the MySQL database. ONLY SELECT, SHOW, DESCRIBE statements are allowed to prevent write operations.", inputSchema: z.object({ query: z.string().describe("The SQL query to execute."), params: z.array(z.any()).optional().describe("Optional parameters for the query."), }), // ❌ 移除 outputSchema,避免校验错误 }, async (args) => { // 给 params 设置默认空数组,防止 undefined const { query, params = [] } = args; // 1. 安全检查 const lowerQuery = query.toLowerCase().trim(); if (!lowerQuery.startsWith("select") && !lowerQuery.startsWith("show") && !lowerQuery.startsWith("describe")) { return { content: [{ type: "text", text: "Error: Only SELECT, SHOW, and DESCRIBE queries are allowed in read-only mode." }], isError: true }; } try { // 2. 执行查询 const [rows] = await pool.execute(query, params); // 3. 返回 MCP 标准格式 return { content: [ { type: "text", // JSON.stringify 第二个参数用于处理 MySQL 可能返回的 BigInt 类型,防止报错 text: JSON.stringify(rows, (key, value) => typeof value === 'bigint' ? value.toString() : value , 2) } ] }; } catch (error) { console.error("Error executing query:", error); // 4. 错误处理 return { content: [ { type: "text", text: `Failed to execute query: ${error instanceof Error ? error.message : String(error)}` } ], isError: true }; } } ); // Connect to transport const transport = new StdioServerTransport(); await server.connect(transport); console.log("MCP Server initialized and listening for commands.");

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/gakkiismywife/mysql-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server