#!/usr/bin/env node
import { Server } from "@modelcontextprotocol/sdk/server/index.js";
import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js";
import {
CallToolRequestSchema,
ListToolsRequestSchema,
} from "@modelcontextprotocol/sdk/types.js";
import mysql from "mysql2/promise";
import { z } from "zod";
// Configuration from environment
const config = {
host: process.env.MINDSDB_HOST || "127.0.0.1",
port: parseInt(process.env.MINDSDB_PORT || "47335", 10),
user: process.env.MINDSDB_USER || "mindsdb",
password: process.env.MINDSDB_PASSWORD || "",
};
// Connection pool (lazy initialized)
let pool: mysql.Pool | null = null;
function getPool(): mysql.Pool {
if (!pool) {
pool = mysql.createPool({
host: config.host,
port: config.port,
user: config.user,
password: config.password,
waitForConnections: true,
connectionLimit: 5,
queueLimit: 0,
enableKeepAlive: true,
keepAliveInitialDelay: 10000,
});
}
return pool;
}
async function executeQuery(
sql: string,
database?: string
): Promise<{ columns: string[]; rows: unknown[][] }> {
const p = getPool();
const conn = await p.getConnection();
try {
if (database) {
await conn.query(`USE \`${database}\``);
}
const [rows, fields] = await conn.query(sql);
const columns = fields ? (fields as mysql.FieldPacket[]).map((f) => f.name) : [];
const data = Array.isArray(rows) ? rows : [rows];
return {
columns,
rows: data.map((row) =>
columns.map((col) => (row as Record<string, unknown>)[col])
),
};
} finally {
conn.release();
}
}
// Tool schemas using Zod
const QuerySchema = z.object({
sql: z.string().describe("SQL query to execute"),
database: z.string().optional().describe("Database context to use"),
});
const ListTablesSchema = z.object({
database: z.string().describe("Database to list tables from"),
});
const DescribeSchema = z.object({
name: z.string().describe("Name of model, table, or knowledge base to describe"),
database: z.string().optional().describe("Database context"),
});
const HybridSearchKbSchema = z.object({
kb_name: z.string().describe("Knowledge base name"),
question: z.string().describe("Search query"),
limit: z.number().optional().default(10).describe("Max results to return"),
alpha: z
.number()
.optional()
.default(0.5)
.describe("Hybrid search alpha (0=keyword, 1=semantic, 0.5=balanced)"),
min_relevance: z
.number()
.optional()
.default(0)
.describe("Minimum relevance score (0-1)"),
metadata_filter: z
.record(z.string())
.optional()
.describe("Metadata field filters"),
});
const InsertKbSchema = z.object({
kb_name: z.string().describe("Knowledge base name"),
source_query: z
.string()
.describe("SELECT query for source data (e.g., SELECT id, content FROM mydb.docs)"),
batch_size: z.number().optional().default(1000).describe("Batch size for insertion"),
threads: z.number().optional().default(1).describe("Number of threads"),
skip_existing: z.boolean().optional().default(false).describe("Skip existing records"),
});
const CreateKbSchema = z.object({
name: z.string().describe("Knowledge base name"),
embedding_model: z
.object({
provider: z
.string()
.describe("Provider: openai, azure, google, ollama"),
model_name: z.string().describe("Model name"),
api_key: z.string().optional().describe("API key if required"),
})
.describe("Embedding model configuration"),
content_columns: z.array(z.string()).describe("Columns to embed"),
metadata_columns: z.array(z.string()).optional().describe("Metadata columns"),
id_column: z.string().optional().describe("ID column name"),
reranking_model: z
.object({
provider: z.string().describe("Reranking provider"),
model_name: z.string().describe("Reranking model name"),
api_key: z.string().optional().describe("API key if required"),
})
.optional()
.describe("Optional reranking model"),
storage: z.string().optional().default("chromadb").describe("Vector storage backend"),
});
const CreateJobSchema = z.object({
name: z.string().describe("Job name"),
query: z.string().describe("SQL query to execute"),
every: z.string().describe("Interval (e.g., '1 hour', '1 day', '15 minute')"),
start: z.string().optional().describe("Start datetime (ISO format)"),
end: z.string().optional().describe("End datetime (ISO format)"),
});
const ConnectDatabaseSchema = z.object({
name: z.string().describe("Database connection name"),
engine: z.string().describe("Database engine (postgres, mysql, mongodb, etc.)"),
parameters: z
.object({
host: z.string().describe("Database host"),
port: z.number().describe("Database port"),
database: z.string().describe("Database name"),
user: z.string().describe("Username"),
password: z.string().describe("Password"),
})
.describe("Connection parameters"),
});
// Tool definitions
const tools = [
{
name: "query",
description:
"Execute any MindsDB SQL query. Use this for custom queries not covered by other tools.",
inputSchema: {
type: "object" as const,
properties: {
sql: { type: "string", description: "SQL query to execute" },
database: { type: "string", description: "Database context to use" },
},
required: ["sql"],
},
},
{
name: "list_databases",
description: "List all databases/data sources in MindsDB.",
inputSchema: {
type: "object" as const,
properties: {},
required: [],
},
},
{
name: "list_knowledge_bases",
description: "List all knowledge bases in MindsDB.",
inputSchema: {
type: "object" as const,
properties: {},
required: [],
},
},
{
name: "describe",
description:
"Describe a model, table, or knowledge base. Returns column information and structure.",
inputSchema: {
type: "object" as const,
properties: {
name: {
type: "string",
description: "Name of model, table, or knowledge base to describe",
},
database: { type: "string", description: "Database context" },
},
required: ["name"],
},
},
{
name: "search_kb",
description:
"Search a knowledge base using hybrid semantic + keyword search. Returns relevant chunks with relevance scores.",
inputSchema: {
type: "object" as const,
properties: {
kb_name: { type: "string", description: "Knowledge base name" },
question: { type: "string", description: "Search query" },
limit: { type: "number", description: "Max results (default: 10)" },
alpha: {
type: "number",
description: "Hybrid alpha: 0=keyword, 1=semantic, 0.5=balanced (default: 0.5)",
},
min_relevance: {
type: "number",
description: "Min relevance score 0-1 (default: 0)",
},
metadata_filter: {
type: "object",
description: "Metadata field filters as key-value pairs",
},
},
required: ["kb_name", "question"],
},
},
{
name: "insert_kb",
description:
"Insert data into a knowledge base from a source query. Supports batch processing.",
inputSchema: {
type: "object" as const,
properties: {
kb_name: { type: "string", description: "Knowledge base name" },
source_query: {
type: "string",
description: "SELECT query for source data",
},
batch_size: { type: "number", description: "Batch size (default: 1000)" },
threads: { type: "number", description: "Thread count (default: 1)" },
skip_existing: {
type: "boolean",
description: "Skip existing records (default: false)",
},
},
required: ["kb_name", "source_query"],
},
},
{
name: "create_kb",
description:
"Create a new knowledge base with embedding model configuration.",
inputSchema: {
type: "object" as const,
properties: {
name: { type: "string", description: "Knowledge base name" },
embedding_model: {
type: "object",
description: "Embedding model config with provider, model_name, api_key",
properties: {
provider: { type: "string", description: "Provider: openai, azure, google, ollama" },
model_name: { type: "string", description: "Model name" },
api_key: { type: "string", description: "API key if required" },
},
required: ["provider", "model_name"],
},
content_columns: {
type: "array",
items: { type: "string" },
description: "Columns to embed",
},
metadata_columns: {
type: "array",
items: { type: "string" },
description: "Metadata columns",
},
id_column: { type: "string", description: "ID column name" },
reranking_model: {
type: "object",
description: "Optional reranking model config",
properties: {
provider: { type: "string" },
model_name: { type: "string" },
api_key: { type: "string" },
},
},
storage: { type: "string", description: "Vector storage backend (default: chromadb)" },
},
required: ["name", "embedding_model", "content_columns"],
},
},
{
name: "list_jobs",
description: "List all scheduled jobs in MindsDB.",
inputSchema: {
type: "object" as const,
properties: {},
required: [],
},
},
{
name: "create_job",
description: "Create a scheduled job to run a query at regular intervals.",
inputSchema: {
type: "object" as const,
properties: {
name: { type: "string", description: "Job name" },
query: { type: "string", description: "SQL query to execute" },
every: {
type: "string",
description: "Interval (e.g., '1 hour', '1 day', '15 minute')",
},
start: { type: "string", description: "Start datetime (ISO format)" },
end: { type: "string", description: "End datetime (ISO format)" },
},
required: ["name", "query", "every"],
},
},
{
name: "list_tables",
description: "List all tables in a specific database.",
inputSchema: {
type: "object" as const,
properties: {
database: { type: "string", description: "Database to list tables from" },
},
required: ["database"],
},
},
{
name: "connect_database",
description: "Connect an external database as a data source in MindsDB.",
inputSchema: {
type: "object" as const,
properties: {
name: { type: "string", description: "Database connection name" },
engine: {
type: "string",
description: "Database engine (postgres, mysql, mongodb, etc.)",
},
parameters: {
type: "object",
description: "Connection parameters",
properties: {
host: { type: "string", description: "Database host" },
port: { type: "number", description: "Database port" },
database: { type: "string", description: "Database name" },
user: { type: "string", description: "Username" },
password: { type: "string", description: "Password" },
},
required: ["host", "port", "database", "user", "password"],
},
},
required: ["name", "engine", "parameters"],
},
},
];
// Tool handlers
async function handleQuery(args: z.infer<typeof QuerySchema>) {
const result = await executeQuery(args.sql, args.database);
return {
type: "ok" as const,
...result,
};
}
async function handleListDatabases() {
const result = await executeQuery("SHOW DATABASES");
const databases = result.rows.map((row) => row[0] as string);
return { databases };
}
async function handleListKnowledgeBases() {
const result = await executeQuery(
"SELECT * FROM information_schema.knowledge_bases"
);
const kbs = result.rows.map((row) => {
const obj: Record<string, unknown> = {};
result.columns.forEach((col, i) => {
obj[col] = row[i];
});
return obj;
});
return { knowledge_bases: kbs };
}
async function handleDescribe(args: z.infer<typeof DescribeSchema>) {
const result = await executeQuery(`DESCRIBE ${args.name}`, args.database);
const description = result.rows.map((row) => {
const obj: Record<string, unknown> = {};
result.columns.forEach((col, i) => {
obj[col] = row[i];
});
return obj;
});
return { description };
}
async function handleHybridSearchKb(args: z.infer<typeof HybridSearchKbSchema>) {
const { kb_name, question, limit = 10, alpha = 0.5, min_relevance = 0, metadata_filter } = args;
// Escape single quotes in question
const escapedQuestion = question.replace(/'/g, "''");
let sql = `SELECT chunk_id, chunk_content, relevance, distance FROM ${kb_name} WHERE content = '${escapedQuestion}'`;
if (alpha !== 0.5) {
sql += ` AND hybrid_search_alpha = ${alpha}`;
}
if (min_relevance > 0) {
sql += ` AND relevance >= ${min_relevance}`;
}
if (metadata_filter) {
for (const [key, value] of Object.entries(metadata_filter)) {
const escapedValue = String(value).replace(/'/g, "''");
sql += ` AND ${key} = '${escapedValue}'`;
}
}
sql += ` LIMIT ${limit}`;
const result = await executeQuery(sql);
const results = result.rows.map((row) => ({
chunk_id: row[0],
chunk_content: row[1],
relevance: row[2],
distance: row[3],
}));
return { results, query: sql };
}
async function handleInsertKb(args: z.infer<typeof InsertKbSchema>) {
const { kb_name, source_query, batch_size = 1000, threads = 1, skip_existing = false } = args;
let sql = `INSERT INTO ${kb_name} (${source_query})`;
const usingParams: string[] = [];
if (batch_size !== 1000) {
usingParams.push(`batch_size = ${batch_size}`);
}
if (threads !== 1) {
usingParams.push(`threads = ${threads}`);
}
if (skip_existing) {
usingParams.push(`kb_skip_existing = true`);
}
if (usingParams.length > 0) {
sql += ` USING ${usingParams.join(", ")}`;
}
await executeQuery(sql);
return { success: true, query: sql };
}
async function handleCreateKb(args: z.infer<typeof CreateKbSchema>) {
const {
name,
embedding_model,
content_columns,
metadata_columns,
id_column,
reranking_model,
storage = "chromadb",
} = args;
// Build embedding model JSON
const embeddingConfig: Record<string, unknown> = {
provider: embedding_model.provider,
model_name: embedding_model.model_name,
};
if (embedding_model.api_key) {
embeddingConfig.api_key = embedding_model.api_key;
}
let sql = `CREATE KNOWLEDGE_BASE ${name}`;
const params: string[] = [];
params.push(`embedding_model = ${JSON.stringify(embeddingConfig)}`);
params.push(`content_columns = ${JSON.stringify(content_columns)}`);
if (metadata_columns && metadata_columns.length > 0) {
params.push(`metadata_columns = ${JSON.stringify(metadata_columns)}`);
}
if (id_column) {
params.push(`id_column = '${id_column}'`);
}
if (reranking_model) {
const rerankConfig: Record<string, unknown> = {
provider: reranking_model.provider,
model_name: reranking_model.model_name,
};
if (reranking_model.api_key) {
rerankConfig.api_key = reranking_model.api_key;
}
params.push(`reranking_model = ${JSON.stringify(rerankConfig)}`);
}
params.push(`storage = '${storage}'`);
sql += ` USING ${params.join(", ")}`;
await executeQuery(sql);
return { kb_name: name, query: sql };
}
async function handleListJobs() {
const result = await executeQuery("SHOW JOBS");
const jobs = result.rows.map((row) => {
const obj: Record<string, unknown> = {};
result.columns.forEach((col, i) => {
obj[col] = row[i];
});
return obj;
});
return { jobs };
}
async function handleCreateJob(args: z.infer<typeof CreateJobSchema>) {
const { name, query, every, start, end } = args;
// Escape the query for use in CREATE JOB
const escapedQuery = query.replace(/'/g, "''");
let sql = `CREATE JOB ${name} ('${escapedQuery}') EVERY ${every}`;
if (start) {
sql += ` START '${start}'`;
}
if (end) {
sql += ` END '${end}'`;
}
await executeQuery(sql);
return { job_name: name, query: sql };
}
async function handleListTables(args: z.infer<typeof ListTablesSchema>) {
const result = await executeQuery(`SHOW TABLES FROM ${args.database}`);
const tables = result.rows.map((row) => row[0] as string);
return { tables };
}
async function handleConnectDatabase(args: z.infer<typeof ConnectDatabaseSchema>) {
const { name, engine, parameters } = args;
const paramsJson = JSON.stringify(parameters);
const sql = `CREATE DATABASE ${name} WITH ENGINE = '${engine}', PARAMETERS = ${paramsJson}`;
await executeQuery(sql);
return { database_name: name, query: sql };
}
// Main server setup
const server = new Server(
{
name: "mindsdb-mysql",
version: "0.1.0",
},
{
capabilities: {
tools: {},
},
}
);
// Register tool list handler
server.setRequestHandler(ListToolsRequestSchema, async () => {
return { tools };
});
// Register tool call handler
server.setRequestHandler(CallToolRequestSchema, async (request) => {
const { name, arguments: args } = request.params;
try {
let result: unknown;
switch (name) {
case "query":
result = await handleQuery(QuerySchema.parse(args));
break;
case "list_databases":
result = await handleListDatabases();
break;
case "list_knowledge_bases":
result = await handleListKnowledgeBases();
break;
case "describe":
result = await handleDescribe(DescribeSchema.parse(args));
break;
case "search_kb":
result = await handleHybridSearchKb(HybridSearchKbSchema.parse(args));
break;
case "insert_kb":
result = await handleInsertKb(InsertKbSchema.parse(args));
break;
case "create_kb":
result = await handleCreateKb(CreateKbSchema.parse(args));
break;
case "list_jobs":
result = await handleListJobs();
break;
case "create_job":
result = await handleCreateJob(CreateJobSchema.parse(args));
break;
case "list_tables":
result = await handleListTables(ListTablesSchema.parse(args));
break;
case "connect_database":
result = await handleConnectDatabase(ConnectDatabaseSchema.parse(args));
break;
default:
throw new Error(`Unknown tool: ${name}`);
}
return {
content: [
{
type: "text",
text: JSON.stringify(result, null, 2),
},
],
};
} catch (error) {
const message = error instanceof Error ? error.message : String(error);
return {
content: [
{
type: "text",
text: JSON.stringify({ error: message }, null, 2),
},
],
isError: true,
};
}
});
// Handle graceful shutdown
process.on("SIGINT", async () => {
if (pool) {
await pool.end();
}
process.exit(0);
});
process.on("SIGTERM", async () => {
if (pool) {
await pool.end();
}
process.exit(0);
});
// Start the server
async function main() {
const transport = new StdioServerTransport();
await server.connect(transport);
console.error(`MindsDB MySQL MCP server running (${config.host}:${config.port})`);
}
main().catch((error) => {
console.error("Fatal error:", error);
process.exit(1);
});