Skip to main content
Glama

MySQL MCP Server

by SuMiaoALi
index.ts13.3 kB
#!/usr/bin/env node /** * MySQL MCP Server * 提供MySQL数据库连接和查询功能,包括: * - 执行SQL查询 * - 获取数据库表信息 * - 获取数据库结构信息 */ import { Server } from "@modelcontextprotocol/sdk/server/index.js"; import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js"; import { CallToolRequestSchema, ListResourcesRequestSchema, ListToolsRequestSchema, ReadResourceRequestSchema, ErrorCode, McpError, } from "@modelcontextprotocol/sdk/types.js"; import mysql from 'mysql2/promise'; // MySQL连接配置 const MYSQL_CONFIG = { host: process.env.MYSQL_HOST || 'localhost', port: parseInt(process.env.MYSQL_PORT || '3306'), user: process.env.MYSQL_USER || 'root', password: process.env.MYSQL_PASSWORD || '', database: process.env.MYSQL_DB || 'mysql', charset: 'utf8mb4', timezone: process.env.MYSQL_TIMEZONE || '+00:00' }; // 数据库连接池 let connectionPool: mysql.Pool; /** * 初始化数据库连接池 */ function initializeDatabase() { connectionPool = mysql.createPool({ ...MYSQL_CONFIG, waitForConnections: true, connectionLimit: 10, queueLimit: 0, }); } /** * 执行SQL查询 */ async function executeQuery(sql: string, params: any[] = []): Promise<any> { try { const [rows] = await connectionPool.execute(sql, params); return rows; } catch (error) { console.error('SQL执行错误:', error); throw new McpError( ErrorCode.InternalError, `SQL执行失败: ${error instanceof Error ? error.message : String(error)}` ); } } /** * 获取数据库表信息 */ async function getTablesInfo(): Promise<any[]> { const sql = ` SELECT TABLE_NAME as table_name, TABLE_TYPE as table_type, ENGINE as engine, TABLE_ROWS as table_rows, TABLE_COMMENT as table_comment FROM information_schema.TABLES WHERE TABLE_SCHEMA = ? ORDER BY TABLE_NAME `; return await executeQuery(sql, [MYSQL_CONFIG.database]); } /** * 获取表结构信息 */ async function getTableStructure(tableName: string): Promise<any[]> { const sql = ` SELECT COLUMN_NAME as column_name, DATA_TYPE as data_type, IS_NULLABLE as is_nullable, COLUMN_DEFAULT as column_default, COLUMN_COMMENT as column_comment, COLUMN_KEY as column_key FROM information_schema.COLUMNS WHERE TABLE_SCHEMA = ? AND TABLE_NAME = ? ORDER BY ORDINAL_POSITION `; return await executeQuery(sql, [MYSQL_CONFIG.database, tableName]); } /** * 创建MCP服务器 */ const server = new Server( { name: "mysql-mcp-server", version: "0.1.0", }, { capabilities: { resources: {}, tools: {}, }, } ); /** * 资源处理器 - 列出可用的数据库资源 */ server.setRequestHandler(ListResourcesRequestSchema, async () => { try { const tables = await getTablesInfo(); const resources = [ { uri: "mysql://database/info", mimeType: "application/json", name: "数据库信息", description: `${MYSQL_CONFIG.database} 数据库的基本信息和表列表` }, ...tables.map(table => ({ uri: `mysql://table/${table.table_name}`, mimeType: "application/json", name: `表: ${table.table_name}`, description: `${table.table_name} 表的结构信息 ${table.table_comment ? `- ${table.table_comment}` : ''}` })) ]; return { resources }; } catch (error) { console.error('获取资源列表失败:', error); return { resources: [] }; } }); /** * 资源读取处理器 */ server.setRequestHandler(ReadResourceRequestSchema, async (request) => { const url = new URL(request.params.uri); try { if (url.pathname === '/database/info' || url.pathname === '/info') { // 返回数据库信息 const tables = await getTablesInfo(); const dbInfo = { database: MYSQL_CONFIG.database, host: MYSQL_CONFIG.host, port: MYSQL_CONFIG.port, table_count: tables.length, tables: tables }; return { contents: [{ uri: request.params.uri, mimeType: "application/json", text: JSON.stringify(dbInfo, null, 2) }] }; } else if (url.pathname.startsWith('/table/')) { // 返回表结构信息 const tableName = url.pathname.replace('/table/', ''); const structure = await getTableStructure(tableName); return { contents: [{ uri: request.params.uri, mimeType: "application/json", text: JSON.stringify({ table_name: tableName, columns: structure }, null, 2) }] }; } else { throw new McpError(ErrorCode.InvalidRequest, `未知的资源路径: ${url.pathname}`); } } catch (error) { if (error instanceof McpError) { throw error; } throw new McpError( ErrorCode.InternalError, `读取资源失败: ${error instanceof Error ? error.message : String(error)}` ); } }); /** * 工具列表处理器 */ server.setRequestHandler(ListToolsRequestSchema, async () => { return { tools: [ { name: "execute_query", description: "执行SQL查询语句(支持SELECT、INSERT、SHOW、DESCRIBE)", inputSchema: { type: "object", properties: { sql: { type: "string", description: "要执行的SQL语句(支持SELECT、INSERT、SHOW、DESCRIBE)" }, limit: { type: "number", description: "限制返回的行数(默认100,最大1000,仅对SELECT有效)", minimum: 1, maximum: 1000, default: 100 } }, required: ["sql"] } }, { name: "get_table_info", description: "获取指定表的详细信息和结构", inputSchema: { type: "object", properties: { table_name: { type: "string", description: "表名" } }, required: ["table_name"] } }, { name: "insert_data", description: "向指定表插入数据(便捷的INSERT操作)", inputSchema: { type: "object", properties: { table_name: { type: "string", description: "目标表名" }, data: { type: "object", description: "要插入的数据,键值对形式,键为列名,值为对应的数据" } }, required: ["table_name", "data"] } }, { name: "list_tables", description: "列出数据库中的所有表", inputSchema: { type: "object", properties: {}, additionalProperties: false } } ] }; }); /** * 工具调用处理器 */ server.setRequestHandler(CallToolRequestSchema, async (request) => { try { switch (request.params.name) { case "execute_query": { const sql = String(request.params.arguments?.sql || '').trim(); const limit = Number(request.params.arguments?.limit || 100); if (!sql) { throw new McpError(ErrorCode.InvalidParams, "SQL语句不能为空"); } // 安全检查:只允许安全的数据库操作 const sqlUpper = sql.toUpperCase().trim(); const allowedOperations = ['SELECT', 'INSERT', 'SHOW', 'DESCRIBE', 'DESC']; const isAllowed = allowedOperations.some(op => sqlUpper.startsWith(op)); if (!isAllowed) { throw new McpError(ErrorCode.InvalidParams, "仅支持SELECT、INSERT、SHOW、DESCRIBE操作"); } // 对SELECT查询添加LIMIT限制 let finalSql = sql; if (sqlUpper.startsWith('SELECT') && !sqlUpper.includes('LIMIT')) { finalSql += ` LIMIT ${Math.min(limit, 1000)}`; } const startTime = Date.now(); const results = await executeQuery(finalSql); const executionTime = (Date.now() - startTime) / 1000; // 处理不同类型的SQL操作结果 let responseData: any = { success: true, sql: finalSql, execution_time: executionTime }; if (sqlUpper.startsWith('SELECT') || sqlUpper.startsWith('SHOW') || sqlUpper.startsWith('DESCRIBE') || sqlUpper.startsWith('DESC')) { // 查询操作:返回数据和行数 responseData.row_count = Array.isArray(results) ? results.length : 0; responseData.data = results; } else if (sqlUpper.startsWith('INSERT')) { // 插入操作:返回影响行数和插入ID responseData.affected_rows = results.affectedRows || 0; responseData.insert_id = results.insertId || null; responseData.message = `成功插入 ${results.affectedRows || 0} 行数据`; } else { // 其他操作 responseData.result = results; } return { content: [{ type: "text", text: JSON.stringify(responseData, null, 2) }] }; } case "get_table_info": { const tableName = String(request.params.arguments?.table_name || ''); if (!tableName) { throw new McpError(ErrorCode.InvalidParams, "表名不能为空"); } const structure = await getTableStructure(tableName); if (structure.length === 0) { throw new McpError(ErrorCode.InvalidParams, `表 ${tableName} 不存在`); } return { content: [{ type: "text", text: JSON.stringify({ table_name: tableName, column_count: structure.length, columns: structure }, null, 2) }] }; } case "insert_data": { const tableName = String(request.params.arguments?.table_name || ''); const data = request.params.arguments?.data; if (!tableName) { throw new McpError(ErrorCode.InvalidParams, "表名不能为空"); } if (!data || typeof data !== 'object' || Object.keys(data).length === 0) { throw new McpError(ErrorCode.InvalidParams, "插入数据不能为空"); } // 构建INSERT语句 const columns = Object.keys(data); const values = Object.values(data); const placeholders = columns.map(() => '?').join(', '); const columnsStr = columns.join(', '); const insertSql = `INSERT INTO ${tableName} (${columnsStr}) VALUES (${placeholders})`; const startTime = Date.now(); const results = await executeQuery(insertSql, values); const executionTime = (Date.now() - startTime) / 1000; return { content: [{ type: "text", text: JSON.stringify({ success: true, sql: insertSql, table_name: tableName, inserted_data: data, affected_rows: results.affectedRows || 0, insert_id: results.insertId || null, execution_time: executionTime, message: `成功向表 ${tableName} 插入 ${results.affectedRows || 0} 行数据` }, null, 2) }] }; } case "list_tables": { const tables = await getTablesInfo(); return { content: [{ type: "text", text: JSON.stringify({ database: MYSQL_CONFIG.database, table_count: tables.length, tables: tables }, null, 2) }] }; } default: throw new McpError(ErrorCode.MethodNotFound, `未知的工具: ${request.params.name}`); } } catch (error) { if (error instanceof McpError) { throw error; } return { content: [{ type: "text", text: JSON.stringify({ success: false, error: error instanceof Error ? error.message : String(error) }, null, 2) }], isError: true }; } }); /** * 启动服务器 */ async function main() { try { // 初始化数据库连接 initializeDatabase(); // 测试数据库连接 await executeQuery('SELECT 1 as test'); console.error('MySQL连接成功'); // 启动MCP服务器 const transport = new StdioServerTransport(); await server.connect(transport); console.error('MySQL MCP Server 启动成功'); } catch (error) { console.error('服务器启动失败:', error); process.exit(1); } } // 错误处理 server.onerror = (error) => console.error('[MCP Error]', error); process.on('SIGINT', async () => { if (connectionPool) { await connectionPool.end(); } process.exit(0); }); main().catch((error) => { console.error("服务器错误:", error); process.exit(1); });

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/SuMiaoALi/mysql-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server