Skip to main content
Glama

MCP Server for MySQL

MIT License
15
842
  • Linux
  • Apple
schema-permissions.test.ts8.31 kB
import * as mysql2 from "mysql2/promise"; import { describe, it, expect, beforeAll, afterAll, beforeEach } from "vitest"; import { executeReadOnlyQuery, executeWriteQuery, } from "../../../dist/src/db/index.js"; // Create test environment for schema-specific permissions describe("Schema-specific Permissions", () => { let pool: any; beforeAll(async () => { // Mock environment variables for schema-specific permissions process.env.ALLOW_INSERT_OPERATION = "false"; process.env.ALLOW_UPDATE_OPERATION = "false"; process.env.ALLOW_DELETE_OPERATION = "false"; process.env.ALLOW_DDL_OPERATION = "false"; // Set schema-specific permissions process.env.SCHEMA_INSERT_PERMISSIONS = "test_schema_1:true,test_schema_2:false"; process.env.SCHEMA_UPDATE_PERMISSIONS = "test_schema_1:false,test_schema_2:true"; process.env.SCHEMA_DELETE_PERMISSIONS = "test_schema_1:true,test_schema_2:false"; process.env.SCHEMA_DDL_PERMISSIONS = "test_schema_1:true,test_schema_2:false"; // Create connection pool for testing const config: any = { host: process.env.MYSQL_HOST || "127.0.0.1", port: Number(process.env.MYSQL_PORT || "3306"), user: process.env.MYSQL_USER || "root", connectionLimit: 5, multipleStatements: true, }; // Only add password if it's set if (process.env.MYSQL_PASS) { config.password = process.env.MYSQL_PASS; } pool = mysql2.createPool(config); // Create test schemas const connection = await pool.getConnection(); try { // Create test schemas await connection.query(`CREATE DATABASE IF NOT EXISTS test_schema_1`); await connection.query(`CREATE DATABASE IF NOT EXISTS test_schema_2`); // Create test tables in each schema await connection.query(` USE test_schema_1; CREATE TABLE IF NOT EXISTS test_table ( id INT AUTO_INCREMENT PRIMARY KEY, name VARCHAR(255) NOT NULL, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ); USE test_schema_2; CREATE TABLE IF NOT EXISTS test_table ( id INT AUTO_INCREMENT PRIMARY KEY, name VARCHAR(255) NOT NULL, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ); `); } finally { connection.release(); } }); beforeEach(async () => { // Reset test data before each test const connection = await pool.getConnection(); try { // Clear the tables in both schemas await connection.query(` USE test_schema_1; TRUNCATE TABLE test_table; INSERT INTO test_table (name) VALUES ('Schema 1 - Test 1'), ('Schema 1 - Test 2'); USE test_schema_2; TRUNCATE TABLE test_table; INSERT INTO test_table (name) VALUES ('Schema 2 - Test 1'), ('Schema 2 - Test 2'); `); } finally { connection.release(); } }); afterAll(async () => { // Clean up test schemas const connection = await pool.getConnection(); try { await connection.query(` DROP DATABASE IF EXISTS test_schema_1; DROP DATABASE IF EXISTS test_schema_2; `); } finally { connection.release(); } // Close the pool await pool.end(); // Clean up environment variables delete process.env.SCHEMA_INSERT_PERMISSIONS; delete process.env.SCHEMA_UPDATE_PERMISSIONS; delete process.env.SCHEMA_DELETE_PERMISSIONS; delete process.env.SCHEMA_DDL_PERMISSIONS; }); // Test INSERT permission for schema_1 (allowed) it("should allow INSERT operations for test_schema_1", async () => { const result = await executeWriteQuery( 'INSERT INTO test_schema_1.test_table (name) VALUES ("New Record")', ); expect(result.isError).toBe(false); expect(result.content[0].text).toContain("Insert successful"); // Verify the record was inserted const connection = await pool.getConnection(); try { const [rows] = (await connection.query( "SELECT * FROM test_schema_1.test_table WHERE name = ?", ["New Record"], )) as [any[], any]; expect(rows.length).toBe(1); } finally { connection.release(); } }); // Test INSERT permission for schema_2 (not allowed) it("should block INSERT operations for test_schema_2", async () => { const result = await executeReadOnlyQuery( 'INSERT INTO test_schema_2.test_table (name) VALUES ("Blocked Insert")', ); expect(result.isError).toBe(true); expect(result.content[0].text).toContain( "INSERT operations are not allowed for schema", ); // Verify the record was not inserted const connection = await pool.getConnection(); try { const [rows] = (await connection.query( "SELECT * FROM test_schema_2.test_table WHERE name = ?", ["Blocked Insert"], )) as [any[], any]; expect(rows.length).toBe(0); // Record should not exist } finally { connection.release(); } }); // Test UPDATE permission for schema_1 (not allowed) it("should block UPDATE operations for test_schema_1", async () => { const result = await executeReadOnlyQuery( 'UPDATE test_schema_1.test_table SET name = "Updated Name" WHERE name = "Schema 1 - Test 1"', ); expect(result.isError).toBe(true); expect(result.content[0].text).toContain( "UPDATE operations are not allowed for schema", ); // Verify the record was not updated const connection = await pool.getConnection(); try { const [rows] = (await connection.query( "SELECT * FROM test_schema_1.test_table WHERE name = ?", ["Schema 1 - Test 1"], )) as [any[], any]; expect(rows.length).toBe(1); // Original record should still exist } finally { connection.release(); } }); // Test UPDATE permission for schema_2 (allowed) it("should allow UPDATE operations for test_schema_2", async () => { const result = await executeWriteQuery( 'UPDATE test_schema_2.test_table SET name = "Updated Name" WHERE name = "Schema 2 - Test 1"', ); expect(result.isError).toBe(false); expect(result.content[0].text).toContain("Update successful"); // Verify the record was updated const connection = await pool.getConnection(); try { const [rows] = (await connection.query( "SELECT * FROM test_schema_2.test_table WHERE name = ?", ["Updated Name"], )) as [any[], any]; expect(rows.length).toBe(1); // Updated record should exist } finally { connection.release(); } }); // Test DDL permission for schema_1 (allowed) it("should allow DDL operations for test_schema_1", async () => { const result = await executeWriteQuery( "ALTER TABLE test_schema_1.test_table ADD COLUMN test_column VARCHAR(50)", ); expect(result.isError).toBe(false); // Verify the column was added const connection = await pool.getConnection(); try { const [columns] = (await connection.query( `SELECT COLUMN_NAME FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA = 'test_schema_1' AND TABLE_NAME = 'test_table' AND COLUMN_NAME = 'test_column'`, )) as [any[], any]; expect(columns.length).toBe(1); // Column should exist } finally { connection.release(); } }); // Test DDL permission for schema_2 (not allowed) it("should block DDL operations for test_schema_2", async () => { const result = await executeReadOnlyQuery( "ALTER TABLE test_schema_2.test_table ADD COLUMN test_column VARCHAR(50)", ); expect(result.isError).toBe(true); expect(result.content[0].text).toContain( "DDL operations are not allowed for schema", ); // Verify the column was not added const connection = await pool.getConnection(); try { const [columns] = (await connection.query( `SELECT COLUMN_NAME FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA = 'test_schema_2' AND TABLE_NAME = 'test_table' AND COLUMN_NAME = 'test_column'`, )) as [any[], any]; expect(columns.length).toBe(0); // Column should not exist } finally { connection.release(); } }); });

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/benborla/mcp-server-mysql'

If you have feedback or need assistance with the MCP directory API, please join our Discord server