index.ts•13.3 kB
#!/usr/bin/env node
/**
* MySQL MCP Server
* 提供MySQL数据库连接和查询功能,包括:
* - 执行SQL查询
* - 获取数据库表信息
* - 获取数据库结构信息
*/
import { Server } from "@modelcontextprotocol/sdk/server/index.js";
import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js";
import {
CallToolRequestSchema,
ListResourcesRequestSchema,
ListToolsRequestSchema,
ReadResourceRequestSchema,
ErrorCode,
McpError,
} from "@modelcontextprotocol/sdk/types.js";
import mysql from 'mysql2/promise';
// MySQL连接配置
const MYSQL_CONFIG = {
host: process.env.MYSQL_HOST || 'localhost',
port: parseInt(process.env.MYSQL_PORT || '3306'),
user: process.env.MYSQL_USER || 'root',
password: process.env.MYSQL_PASSWORD || '',
database: process.env.MYSQL_DB || 'mysql',
charset: 'utf8mb4',
timezone: process.env.MYSQL_TIMEZONE || '+00:00'
};
// 数据库连接池
let connectionPool: mysql.Pool;
/**
* 初始化数据库连接池
*/
function initializeDatabase() {
connectionPool = mysql.createPool({
...MYSQL_CONFIG,
waitForConnections: true,
connectionLimit: 10,
queueLimit: 0,
});
}
/**
* 执行SQL查询
*/
async function executeQuery(sql: string, params: any[] = []): Promise<any> {
try {
const [rows] = await connectionPool.execute(sql, params);
return rows;
} catch (error) {
console.error('SQL执行错误:', error);
throw new McpError(
ErrorCode.InternalError,
`SQL执行失败: ${error instanceof Error ? error.message : String(error)}`
);
}
}
/**
* 获取数据库表信息
*/
async function getTablesInfo(): Promise<any[]> {
const sql = `
SELECT
TABLE_NAME as table_name,
TABLE_TYPE as table_type,
ENGINE as engine,
TABLE_ROWS as table_rows,
TABLE_COMMENT as table_comment
FROM information_schema.TABLES
WHERE TABLE_SCHEMA = ?
ORDER BY TABLE_NAME
`;
return await executeQuery(sql, [MYSQL_CONFIG.database]);
}
/**
* 获取表结构信息
*/
async function getTableStructure(tableName: string): Promise<any[]> {
const sql = `
SELECT
COLUMN_NAME as column_name,
DATA_TYPE as data_type,
IS_NULLABLE as is_nullable,
COLUMN_DEFAULT as column_default,
COLUMN_COMMENT as column_comment,
COLUMN_KEY as column_key
FROM information_schema.COLUMNS
WHERE TABLE_SCHEMA = ? AND TABLE_NAME = ?
ORDER BY ORDINAL_POSITION
`;
return await executeQuery(sql, [MYSQL_CONFIG.database, tableName]);
}
/**
* 创建MCP服务器
*/
const server = new Server(
{
name: "mysql-mcp-server",
version: "0.1.0",
},
{
capabilities: {
resources: {},
tools: {},
},
}
);
/**
* 资源处理器 - 列出可用的数据库资源
*/
server.setRequestHandler(ListResourcesRequestSchema, async () => {
try {
const tables = await getTablesInfo();
const resources = [
{
uri: "mysql://database/info",
mimeType: "application/json",
name: "数据库信息",
description: `${MYSQL_CONFIG.database} 数据库的基本信息和表列表`
},
...tables.map(table => ({
uri: `mysql://table/${table.table_name}`,
mimeType: "application/json",
name: `表: ${table.table_name}`,
description: `${table.table_name} 表的结构信息 ${table.table_comment ? `- ${table.table_comment}` : ''}`
}))
];
return { resources };
} catch (error) {
console.error('获取资源列表失败:', error);
return { resources: [] };
}
});
/**
* 资源读取处理器
*/
server.setRequestHandler(ReadResourceRequestSchema, async (request) => {
const url = new URL(request.params.uri);
try {
if (url.pathname === '/database/info' || url.pathname === '/info') {
// 返回数据库信息
const tables = await getTablesInfo();
const dbInfo = {
database: MYSQL_CONFIG.database,
host: MYSQL_CONFIG.host,
port: MYSQL_CONFIG.port,
table_count: tables.length,
tables: tables
};
return {
contents: [{
uri: request.params.uri,
mimeType: "application/json",
text: JSON.stringify(dbInfo, null, 2)
}]
};
} else if (url.pathname.startsWith('/table/')) {
// 返回表结构信息
const tableName = url.pathname.replace('/table/', '');
const structure = await getTableStructure(tableName);
return {
contents: [{
uri: request.params.uri,
mimeType: "application/json",
text: JSON.stringify({
table_name: tableName,
columns: structure
}, null, 2)
}]
};
} else {
throw new McpError(ErrorCode.InvalidRequest, `未知的资源路径: ${url.pathname}`);
}
} catch (error) {
if (error instanceof McpError) {
throw error;
}
throw new McpError(
ErrorCode.InternalError,
`读取资源失败: ${error instanceof Error ? error.message : String(error)}`
);
}
});
/**
* 工具列表处理器
*/
server.setRequestHandler(ListToolsRequestSchema, async () => {
return {
tools: [
{
name: "execute_query",
description: "执行SQL查询语句(支持SELECT、INSERT、SHOW、DESCRIBE)",
inputSchema: {
type: "object",
properties: {
sql: {
type: "string",
description: "要执行的SQL语句(支持SELECT、INSERT、SHOW、DESCRIBE)"
},
limit: {
type: "number",
description: "限制返回的行数(默认100,最大1000,仅对SELECT有效)",
minimum: 1,
maximum: 1000,
default: 100
}
},
required: ["sql"]
}
},
{
name: "get_table_info",
description: "获取指定表的详细信息和结构",
inputSchema: {
type: "object",
properties: {
table_name: {
type: "string",
description: "表名"
}
},
required: ["table_name"]
}
},
{
name: "insert_data",
description: "向指定表插入数据(便捷的INSERT操作)",
inputSchema: {
type: "object",
properties: {
table_name: {
type: "string",
description: "目标表名"
},
data: {
type: "object",
description: "要插入的数据,键值对形式,键为列名,值为对应的数据"
}
},
required: ["table_name", "data"]
}
},
{
name: "list_tables",
description: "列出数据库中的所有表",
inputSchema: {
type: "object",
properties: {},
additionalProperties: false
}
}
]
};
});
/**
* 工具调用处理器
*/
server.setRequestHandler(CallToolRequestSchema, async (request) => {
try {
switch (request.params.name) {
case "execute_query": {
const sql = String(request.params.arguments?.sql || '').trim();
const limit = Number(request.params.arguments?.limit || 100);
if (!sql) {
throw new McpError(ErrorCode.InvalidParams, "SQL语句不能为空");
}
// 安全检查:只允许安全的数据库操作
const sqlUpper = sql.toUpperCase().trim();
const allowedOperations = ['SELECT', 'INSERT', 'SHOW', 'DESCRIBE', 'DESC'];
const isAllowed = allowedOperations.some(op => sqlUpper.startsWith(op));
if (!isAllowed) {
throw new McpError(ErrorCode.InvalidParams, "仅支持SELECT、INSERT、SHOW、DESCRIBE操作");
}
// 对SELECT查询添加LIMIT限制
let finalSql = sql;
if (sqlUpper.startsWith('SELECT') && !sqlUpper.includes('LIMIT')) {
finalSql += ` LIMIT ${Math.min(limit, 1000)}`;
}
const startTime = Date.now();
const results = await executeQuery(finalSql);
const executionTime = (Date.now() - startTime) / 1000;
// 处理不同类型的SQL操作结果
let responseData: any = {
success: true,
sql: finalSql,
execution_time: executionTime
};
if (sqlUpper.startsWith('SELECT') || sqlUpper.startsWith('SHOW') || sqlUpper.startsWith('DESCRIBE') || sqlUpper.startsWith('DESC')) {
// 查询操作:返回数据和行数
responseData.row_count = Array.isArray(results) ? results.length : 0;
responseData.data = results;
} else if (sqlUpper.startsWith('INSERT')) {
// 插入操作:返回影响行数和插入ID
responseData.affected_rows = results.affectedRows || 0;
responseData.insert_id = results.insertId || null;
responseData.message = `成功插入 ${results.affectedRows || 0} 行数据`;
} else {
// 其他操作
responseData.result = results;
}
return {
content: [{
type: "text",
text: JSON.stringify(responseData, null, 2)
}]
};
}
case "get_table_info": {
const tableName = String(request.params.arguments?.table_name || '');
if (!tableName) {
throw new McpError(ErrorCode.InvalidParams, "表名不能为空");
}
const structure = await getTableStructure(tableName);
if (structure.length === 0) {
throw new McpError(ErrorCode.InvalidParams, `表 ${tableName} 不存在`);
}
return {
content: [{
type: "text",
text: JSON.stringify({
table_name: tableName,
column_count: structure.length,
columns: structure
}, null, 2)
}]
};
}
case "insert_data": {
const tableName = String(request.params.arguments?.table_name || '');
const data = request.params.arguments?.data;
if (!tableName) {
throw new McpError(ErrorCode.InvalidParams, "表名不能为空");
}
if (!data || typeof data !== 'object' || Object.keys(data).length === 0) {
throw new McpError(ErrorCode.InvalidParams, "插入数据不能为空");
}
// 构建INSERT语句
const columns = Object.keys(data);
const values = Object.values(data);
const placeholders = columns.map(() => '?').join(', ');
const columnsStr = columns.join(', ');
const insertSql = `INSERT INTO ${tableName} (${columnsStr}) VALUES (${placeholders})`;
const startTime = Date.now();
const results = await executeQuery(insertSql, values);
const executionTime = (Date.now() - startTime) / 1000;
return {
content: [{
type: "text",
text: JSON.stringify({
success: true,
sql: insertSql,
table_name: tableName,
inserted_data: data,
affected_rows: results.affectedRows || 0,
insert_id: results.insertId || null,
execution_time: executionTime,
message: `成功向表 ${tableName} 插入 ${results.affectedRows || 0} 行数据`
}, null, 2)
}]
};
}
case "list_tables": {
const tables = await getTablesInfo();
return {
content: [{
type: "text",
text: JSON.stringify({
database: MYSQL_CONFIG.database,
table_count: tables.length,
tables: tables
}, null, 2)
}]
};
}
default:
throw new McpError(ErrorCode.MethodNotFound, `未知的工具: ${request.params.name}`);
}
} catch (error) {
if (error instanceof McpError) {
throw error;
}
return {
content: [{
type: "text",
text: JSON.stringify({
success: false,
error: error instanceof Error ? error.message : String(error)
}, null, 2)
}],
isError: true
};
}
});
/**
* 启动服务器
*/
async function main() {
try {
// 初始化数据库连接
initializeDatabase();
// 测试数据库连接
await executeQuery('SELECT 1 as test');
console.error('MySQL连接成功');
// 启动MCP服务器
const transport = new StdioServerTransport();
await server.connect(transport);
console.error('MySQL MCP Server 启动成功');
} catch (error) {
console.error('服务器启动失败:', error);
process.exit(1);
}
}
// 错误处理
server.onerror = (error) => console.error('[MCP Error]', error);
process.on('SIGINT', async () => {
if (connectionPool) {
await connectionPool.end();
}
process.exit(0);
});
main().catch((error) => {
console.error("服务器错误:", error);
process.exit(1);
});