MySQL-MCP
by zajTools
- src
#!/usr/bin/env node
import * as mysql from 'mysql2/promise';
import yargs from 'yargs';
import { hideBin } from 'yargs/helpers';
import * as readline from 'readline';
import { createInterface } from 'readline';
import * as dotenv from 'dotenv';
import * as fs from 'fs';
import * as path from 'path';
// Load environment variables from .env file
dotenv.config();
// Validate required environment variables
function validateEnv() {
const requiredVars = ['DB_CONNECTION', 'DB_USER', 'DB_PASSWORD', 'DB_NAME'];
const missingVars = requiredVars.filter(varName => !process.env[varName]);
if (missingVars.length > 0) {
console.error('Error: Missing required environment variables:');
missingVars.forEach(varName => {
console.error(` - ${varName}`);
});
console.error('\nPlease create a .env file with these variables. See .env.example for reference.');
process.exit(1);
}
// Validate DB_CONNECTION is mysql
if (process.env.DB_CONNECTION !== 'mysql') {
console.error('Error: DB_CONNECTION must be set to "mysql"');
console.error('Please check your .env file and set DB_CONNECTION=mysql');
process.exit(1);
}
}
// Parse command line arguments (as fallback to .env)
const argv = yargs(hideBin(process.argv))
.option('host', {
type: 'string',
description: 'MySQL host',
default: process.env.DB_HOST || 'localhost',
})
.option('port', {
type: 'number',
description: 'MySQL port',
default: parseInt(process.env.DB_PORT || '3306'),
})
.option('user', {
type: 'string',
description: 'MySQL username',
default: process.env.DB_USER,
})
.option('password', {
type: 'string',
description: 'MySQL password',
default: process.env.DB_PASSWORD,
})
.option('database', {
type: 'string',
description: 'MySQL database name',
default: process.env.DB_NAME,
})
.help()
.alias('help', 'h').argv as any;
// Validate environment variables
validateEnv();
// Error codes
enum ErrorCode {
ParseError = -32700,
InvalidRequest = -32600,
MethodNotFound = -32601,
InvalidParams = -32602,
InternalError = -32603,
}
// Custom error class
class McpError extends Error {
code: ErrorCode;
constructor(code: ErrorCode, message: string) {
super(message);
this.code = code;
this.name = 'McpError';
}
}
// Business insights storage
interface Insight {
id: number;
content: string;
timestamp: Date;
}
// MCP message types
interface McpRequest {
jsonrpc: string;
id: string | number;
method: string;
params: any;
}
interface McpResponse {
jsonrpc: string;
id: string | number;
result?: any;
error?: {
code: number;
message: string;
};
}
class MySQLServer {
private pool!: mysql.Pool; // Using the definite assignment assertion
private insights: Insight[] = [];
private insightCounter = 0;
private rl: readline.Interface;
constructor() {
// Create readline interface for stdin/stdout
this.rl = createInterface({
input: process.stdin,
output: process.stdout,
terminal: false,
});
// Handle SIGINT
process.on('SIGINT', async () => {
if (this.pool) {
await this.pool.end();
}
process.exit(0);
});
}
// Initialize database and connection pool
private async initializeDatabase(): Promise<void> {
let connection: mysql.Connection | null = null;
try {
// First create a connection without specifying a database
console.error(`Connecting to MySQL server at ${argv.host}:${argv.port}...`);
try {
connection = await mysql.createConnection({
host: argv.host,
port: argv.port,
user: argv.user,
password: argv.password,
connectTimeout: 10000, // 10 seconds timeout
});
console.error('Successfully connected to MySQL server.');
} catch (err) {
const error = err as Error;
console.error('Failed to connect to MySQL server:', error.message);
if (error.message.includes('Access denied')) {
console.error('Please check your MySQL username and password in the .env file.');
} else if (error.message.includes('ECONNREFUSED')) {
console.error('Could not connect to MySQL server. Please check if the server is running and the host/port settings are correct.');
}
throw new Error(`MySQL connection failed: ${error.message}`);
}
// Check if database exists, create if it doesn't
console.error(`Checking if database '${argv.database}' exists...`);
try {
const [rows] = await connection.query(
'SELECT SCHEMA_NAME FROM INFORMATION_SCHEMA.SCHEMATA WHERE SCHEMA_NAME = ?',
[argv.database]
);
if (Array.isArray(rows) && rows.length === 0) {
console.error(`Database '${argv.database}' does not exist, creating it...`);
await connection.query(`CREATE DATABASE IF NOT EXISTS \`${argv.database}\``);
console.error(`Database '${argv.database}' created successfully.`);
} else {
console.error(`Database '${argv.database}' already exists.`);
}
} catch (err) {
const error = err as Error;
console.error('Failed to check/create database:', error.message);
throw new Error(`Database check/creation failed: ${error.message}`);
}
// Create the pool with the database specified
try {
this.pool = mysql.createPool({
host: argv.host,
port: argv.port,
user: argv.user,
password: argv.password,
database: argv.database,
waitForConnections: true,
connectionLimit: 10,
queueLimit: 0,
connectTimeout: 10000, // 10 seconds timeout
});
// Verify pool connection works
const [testResult] = await this.pool.query('SELECT 1 AS connection_test');
if (Array.isArray(testResult) && testResult.length > 0) {
console.error(`Connected to MySQL database: ${argv.database}`);
}
} catch (err) {
const error = err as Error;
console.error('Failed to create connection pool:', error.message);
throw new Error(`Connection pool creation failed: ${error.message}`);
}
// Run migrations
await this.runMigrations();
} catch (error) {
console.error('Failed to initialize database:', error);
throw error;
} finally {
// Close the initial connection if it was created
if (connection) {
try {
await connection.end();
} catch (err) {
console.error('Error closing initial connection:', err);
}
}
}
}
// Run migrations from setup.sql
private async runMigrations(): Promise<void> {
try {
console.error('Running migrations...');
// Check if the setup.sql file exists
const setupFilePath = path.join(process.cwd(), 'examples', 'sql', 'setup.sql');
if (!fs.existsSync(setupFilePath)) {
console.error('Migration file not found:', setupFilePath);
return;
}
// Read the setup.sql file
const setupSql = fs.readFileSync(setupFilePath, 'utf8');
// Improved SQL statement parsing that handles comments and multi-line statements better
const statements: string[] = [];
let currentStatement = '';
let inComment = false;
let inString = false;
let stringDelimiter = '';
// Process the SQL file character by character
for (let i = 0; i < setupSql.length; i++) {
const char = setupSql[i];
const nextChar = i < setupSql.length - 1 ? setupSql[i + 1] : '';
// Handle string literals
if (!inComment && (char === "'" || char === '"' || char === '`')) {
if (!inString) {
inString = true;
stringDelimiter = char;
} else if (char === stringDelimiter) {
inString = false;
}
}
// Handle comments
if (!inString) {
// Start of multi-line comment
if (char === '/' && nextChar === '*' && !inComment) {
inComment = true;
i++; // Skip the next character
continue;
}
// End of multi-line comment
if (char === '*' && nextChar === '/' && inComment) {
inComment = false;
i++; // Skip the next character
continue;
}
// Single line comment
if (char === '-' && nextChar === '-' && !inComment) {
// Skip until the end of the line
while (i < setupSql.length && setupSql[i] !== '\n') {
i++;
}
continue;
}
}
// If we're in a comment, don't add to the statement
if (inComment) {
continue;
}
// Add character to current statement
currentStatement += char;
// Check for statement termination
if (char === ';' && !inString) {
const trimmedStatement = currentStatement.trim();
if (trimmedStatement.length > 0) {
statements.push(trimmedStatement);
}
currentStatement = '';
}
}
// Add the last statement if it doesn't end with a semicolon
const trimmedLastStatement = currentStatement.trim();
if (trimmedLastStatement.length > 0) {
statements.push(trimmedLastStatement);
}
// Execute each statement
for (const statement of statements) {
// Skip CREATE DATABASE and USE statements as we've already handled the database creation
if (statement.toLowerCase().includes('create database') ||
statement.toLowerCase().includes('use ')) {
continue;
}
try {
await this.pool.query(statement);
console.error(`Executed migration: ${statement.substring(0, 50)}...`);
} catch (error) {
console.error(`Error executing migration: ${statement.substring(0, 100)}...`);
console.error(error);
// Continue with other migrations even if one fails
}
}
console.error('Migrations completed successfully.');
} catch (error) {
console.error('Error running migrations:', error);
throw error;
}
}
// Handle MCP requests
private async handleRequest(request: McpRequest): Promise<McpResponse> {
try {
switch (request.method) {
case 'mcp.listTools':
return {
jsonrpc: '2.0',
id: request.id,
result: {
tools: [
{
name: 'read_query',
description: 'Execute a SELECT query to read data from the database',
inputSchema: {
type: 'object',
properties: {
query: {
type: 'string',
description: 'The SELECT SQL query to execute',
},
},
required: ['query'],
},
},
{
name: 'write_query',
description: 'Execute an INSERT, UPDATE, or DELETE query',
inputSchema: {
type: 'object',
properties: {
query: {
type: 'string',
description: 'The SQL modification query',
},
},
required: ['query'],
},
},
{
name: 'create_table',
description: 'Create a new table in the database',
inputSchema: {
type: 'object',
properties: {
query: {
type: 'string',
description: 'The CREATE TABLE SQL statement',
},
},
required: ['query'],
},
},
{
name: 'list_tables',
description: 'Get a list of all tables in the database',
inputSchema: {
type: 'object',
properties: {},
},
},
{
name: 'describe_table',
description: 'View schema information for a specific table',
inputSchema: {
type: 'object',
properties: {
table_name: {
type: 'string',
description: 'Name of table to describe',
},
},
required: ['table_name'],
},
},
{
name: 'append_insight',
description: 'Add a new business insight to the memo',
inputSchema: {
type: 'object',
properties: {
insight: {
type: 'string',
description: 'Business insight discovered from data analysis',
},
},
required: ['insight'],
},
},
],
},
};
case 'mcp.listResources':
return {
jsonrpc: '2.0',
id: request.id,
result: {
resources: [
{
uri: 'memo://insights',
name: 'Business Insights Memo',
mimeType: 'text/plain',
description: 'A continuously updated memo of business insights discovered during analysis',
},
],
},
};
case 'mcp.listResourceTemplates':
return {
jsonrpc: '2.0',
id: request.id,
result: {
resourceTemplates: [],
},
};
case 'mcp.readResource':
if (request.params.uri === 'memo://insights') {
// Generate insights memo
let memoText = '# Business Insights Memo\n\n';
if (this.insights.length === 0) {
memoText += 'No insights have been discovered yet.\n';
} else {
this.insights.forEach((insight) => {
memoText += `## Insight ${insight.id}\n`;
memoText += `*${insight.timestamp.toISOString()}*\n\n`;
memoText += `${insight.content}\n\n`;
});
}
return {
jsonrpc: '2.0',
id: request.id,
result: {
contents: [
{
uri: request.params.uri,
mimeType: 'text/plain',
text: memoText,
},
],
},
};
} else {
throw new McpError(
ErrorCode.InvalidRequest,
`Unknown resource: ${request.params.uri}`
);
}
case 'mcp.callTool':
return await this.handleToolCall(request);
default:
throw new McpError(
ErrorCode.MethodNotFound,
`Unknown method: ${request.method}`
);
}
} catch (error) {
if (error instanceof McpError) {
return {
jsonrpc: '2.0',
id: request.id,
error: {
code: error.code,
message: error.message,
},
};
}
console.error('Error handling request:', error);
return {
jsonrpc: '2.0',
id: request.id,
error: {
code: ErrorCode.InternalError,
message: (error as Error).message,
},
};
}
}
// Validate SQL query
private validateSqlQuery(query: string, expectedType: 'select' | 'write' | 'create_table'): void {
if (!query || typeof query !== 'string') {
throw new McpError(
ErrorCode.InvalidParams,
'Query must be a non-empty string'
);
}
const trimmedQuery = query.trim().toLowerCase();
switch (expectedType) {
case 'select':
if (!trimmedQuery.startsWith('select')) {
throw new McpError(
ErrorCode.InvalidParams,
'Only SELECT queries are allowed with read_query'
);
}
// Check for potentially harmful operations
if (
trimmedQuery.includes('drop ') ||
trimmedQuery.includes('delete ') ||
trimmedQuery.includes('update ') ||
trimmedQuery.includes('insert ') ||
trimmedQuery.includes('alter ') ||
trimmedQuery.includes('create ')
) {
throw new McpError(
ErrorCode.InvalidParams,
'The query contains potentially harmful operations. Only SELECT statements are allowed with read_query.'
);
}
break;
case 'write':
if (trimmedQuery.startsWith('select')) {
throw new McpError(
ErrorCode.InvalidParams,
'SELECT queries are not allowed with write_query, use read_query instead'
);
}
// Check for allowed operations
if (
!trimmedQuery.startsWith('insert ') &&
!trimmedQuery.startsWith('update ') &&
!trimmedQuery.startsWith('delete ')
) {
throw new McpError(
ErrorCode.InvalidParams,
'Only INSERT, UPDATE, and DELETE operations are allowed with write_query'
);
}
// Check for potentially harmful operations
if (
trimmedQuery.includes('drop ') ||
trimmedQuery.includes('alter ') ||
trimmedQuery.includes('create ')
) {
throw new McpError(
ErrorCode.InvalidParams,
'The query contains potentially harmful operations. Only INSERT, UPDATE, and DELETE are allowed with write_query.'
);
}
break;
case 'create_table':
if (!trimmedQuery.startsWith('create table')) {
throw new McpError(
ErrorCode.InvalidParams,
'Only CREATE TABLE statements are allowed with create_table'
);
}
break;
}
}
// Handle tool calls
private async handleToolCall(request: McpRequest): Promise<McpResponse> {
try {
switch (request.params.name) {
case 'read_query': {
// Check if arguments exist and have the expected structure
if (!request.params.arguments || typeof request.params.arguments !== 'object') {
throw new McpError(
ErrorCode.InvalidParams,
'Invalid arguments: Expected an object with a "query" property'
);
}
const { query } = request.params.arguments as { query: string };
// Validate query
this.validateSqlQuery(query, 'select');
try {
const [rows] = await this.pool.query(query);
return {
jsonrpc: '2.0',
id: request.id,
result: {
content: [
{
type: 'text',
text: JSON.stringify(rows, null, 2),
},
],
},
};
} catch (err) {
const error = err as Error;
console.error('Error executing read query:', error.message);
throw new McpError(
ErrorCode.InternalError,
`Database error: ${error.message}`
);
}
}
case 'write_query': {
// Check if arguments exist and have the expected structure
if (!request.params.arguments || typeof request.params.arguments !== 'object') {
throw new McpError(
ErrorCode.InvalidParams,
'Invalid arguments: Expected an object with a "query" property'
);
}
const { query } = request.params.arguments as { query: string };
// Validate query
this.validateSqlQuery(query, 'write');
try {
const [result] = await this.pool.query(query);
return {
jsonrpc: '2.0',
id: request.id,
result: {
content: [
{
type: 'text',
text: JSON.stringify({ affected_rows: (result as any).affectedRows }, null, 2),
},
],
},
};
} catch (err) {
const error = err as Error;
console.error('Error executing write query:', error.message);
throw new McpError(
ErrorCode.InternalError,
`Database error: ${error.message}`
);
}
}
case 'create_table': {
// Check if arguments exist and have the expected structure
if (!request.params.arguments || typeof request.params.arguments !== 'object') {
throw new McpError(
ErrorCode.InvalidParams,
'Invalid arguments: Expected an object with a "query" property'
);
}
const { query } = request.params.arguments as { query: string };
// Validate query
this.validateSqlQuery(query, 'create_table');
try {
await this.pool.query(query);
return {
jsonrpc: '2.0',
id: request.id,
result: {
content: [
{
type: 'text',
text: 'Table created successfully',
},
],
},
};
} catch (err) {
const error = err as Error;
console.error('Error creating table:', error.message);
throw new McpError(
ErrorCode.InternalError,
`Database error: ${error.message}`
);
}
}
case 'list_tables': {
const [rows] = await this.pool.query(
'SELECT table_name FROM information_schema.tables WHERE table_schema = ?',
[argv.database]
);
const tables = (rows as any[]).map((row) => row.table_name || row.TABLE_NAME);
return {
jsonrpc: '2.0',
id: request.id,
result: {
content: [
{
type: 'text',
text: JSON.stringify(tables, null, 2),
},
],
},
};
}
case 'describe_table': {
const { table_name } = request.params.arguments as { table_name: string };
const [rows] = await this.pool.query(
'SELECT column_name, data_type, is_nullable, column_default ' +
'FROM information_schema.columns ' +
'WHERE table_schema = ? AND table_name = ?',
[argv.database, table_name]
);
return {
jsonrpc: '2.0',
id: request.id,
result: {
content: [
{
type: 'text',
text: JSON.stringify(rows, null, 2),
},
],
},
};
}
case 'append_insight': {
const { insight } = request.params.arguments as { insight: string };
this.insightCounter++;
const newInsight: Insight = {
id: this.insightCounter,
content: insight,
timestamp: new Date(),
};
this.insights.push(newInsight);
return {
jsonrpc: '2.0',
id: request.id,
result: {
content: [
{
type: 'text',
text: `Insight #${newInsight.id} added to memo`,
},
],
},
};
}
default:
throw new McpError(
ErrorCode.MethodNotFound,
`Unknown tool: ${request.params.name}`
);
}
} catch (error) {
if (error instanceof McpError) {
throw error;
}
console.error('Error executing tool:', error);
return {
jsonrpc: '2.0',
id: request.id,
result: {
content: [
{
type: 'text',
text: `Error: ${(error as Error).message}`,
},
],
isError: true,
},
};
}
}
// Start the server
async run() {
try {
// Initialize database and connection pool
await this.initializeDatabase();
// Start reading from stdin
this.rl.on('line', async (line) => {
try {
// Parse the request
const request = JSON.parse(line) as McpRequest;
// Handle the request
const response = await this.handleRequest(request);
// Send the response
console.log(JSON.stringify(response));
} catch (error) {
console.error('Error processing request:', error);
// Send error response
const response: McpResponse = {
jsonrpc: '2.0',
id: 'unknown',
error: {
code: ErrorCode.ParseError,
message: (error as Error).message,
},
};
console.log(JSON.stringify(response));
}
});
console.error('MySQL MCP server running on stdio');
} catch (error) {
console.error('Failed to start MySQL MCP server:', error);
process.exit(1);
}
}
}
const server = new MySQLServer();
server.run().catch(console.error);