#!/usr/bin/env node
import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js";
import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js";
// @ts-ignore - msnodesqlv8 doesn't have types
import msnodesqlv8 from "msnodesqlv8";
import { z } from "zod";
// Connection string for LocalDB using ODBC Driver 17
const getConnectionString = (database: string = "master"): string =>
`Driver={ODBC Driver 17 for SQL Server};Server=(localdb)\\mssqllocaldb;Database=${database};Trusted_Connection=yes;`;
let currentDatabase: string = "master";
// Helper function to run a query
function runQuery(connectionString: string, query: string): Promise<any[]> {
return new Promise((resolve, reject) => {
msnodesqlv8.query(connectionString, query, (err: any, rows?: any[]) => {
if (err) reject(err);
else resolve(rows || []);
});
});
}
// Helper to get connection string for current or specified database
function getConnString(database?: string): string {
const db = database || currentDatabase;
return getConnectionString(db);
}
// Create the MCP server
const server = new McpServer({
name: "localdb-mcp",
version: "1.0.0",
});
// Tool: List all databases
server.tool(
"list_databases",
"List all databases on the LocalDB instance",
{},
async () => {
try {
const connStr = getConnectionString("master");
const rows = await runQuery(connStr, `
SELECT
name,
database_id,
create_date,
state_desc,
recovery_model_desc
FROM sys.databases
ORDER BY name
`);
return {
content: [
{
type: "text",
text: JSON.stringify(rows, null, 2),
},
],
};
} catch (error) {
return {
content: [
{
type: "text",
text: `Error listing databases: ${error instanceof Error ? error.message : String(error)}`,
},
],
isError: true,
};
}
}
);
// Tool: Connect to a database
server.tool(
"connect_database",
"Connect to a specific database on LocalDB",
{
database: z.string().describe("Name of the database to connect to"),
},
async ({ database }) => {
try {
// Test connection by running a simple query
const connStr = getConnectionString(database);
await runQuery(connStr, "SELECT 1 as test");
currentDatabase = database;
return {
content: [
{
type: "text",
text: `Successfully connected to database: ${database}`,
},
],
};
} catch (error) {
return {
content: [
{
type: "text",
text: `Error connecting to database: ${error instanceof Error ? error.message : String(error)}`,
},
],
isError: true,
};
}
}
);
// Tool: List tables in current database
server.tool(
"list_tables",
"List all tables in the current database",
{
database: z.string().optional().describe("Database name (uses current if not specified)"),
},
async ({ database }) => {
try {
const connStr = getConnString(database);
const rows = await runQuery(connStr, `
SELECT
TABLE_SCHEMA as [schema],
TABLE_NAME as [table_name],
TABLE_TYPE as [table_type]
FROM INFORMATION_SCHEMA.TABLES
ORDER BY TABLE_SCHEMA, TABLE_NAME
`);
return {
content: [
{
type: "text",
text: JSON.stringify(rows, null, 2),
},
],
};
} catch (error) {
return {
content: [
{
type: "text",
text: `Error listing tables: ${error instanceof Error ? error.message : String(error)}`,
},
],
isError: true,
};
}
}
);
// Tool: Describe table schema
server.tool(
"describe_table",
"Get the schema/structure of a table including columns, data types, and constraints",
{
table: z.string().describe("Name of the table to describe"),
schema: z.string().optional().default("dbo").describe("Schema name (defaults to 'dbo')"),
database: z.string().optional().describe("Database name (uses current if not specified)"),
},
async ({ table, schema, database }) => {
try {
const connStr = getConnString(database);
// Get column information
const columns = await runQuery(connStr, `
SELECT
c.COLUMN_NAME,
c.DATA_TYPE,
c.CHARACTER_MAXIMUM_LENGTH,
c.NUMERIC_PRECISION,
c.NUMERIC_SCALE,
c.IS_NULLABLE,
c.COLUMN_DEFAULT,
CASE WHEN pk.COLUMN_NAME IS NOT NULL THEN 'YES' ELSE 'NO' END as IS_PRIMARY_KEY
FROM INFORMATION_SCHEMA.COLUMNS c
LEFT JOIN (
SELECT ku.TABLE_SCHEMA, ku.TABLE_NAME, ku.COLUMN_NAME
FROM INFORMATION_SCHEMA.TABLE_CONSTRAINTS tc
JOIN INFORMATION_SCHEMA.KEY_COLUMN_USAGE ku
ON tc.CONSTRAINT_NAME = ku.CONSTRAINT_NAME
WHERE tc.CONSTRAINT_TYPE = 'PRIMARY KEY'
) pk ON c.TABLE_SCHEMA = pk.TABLE_SCHEMA
AND c.TABLE_NAME = pk.TABLE_NAME
AND c.COLUMN_NAME = pk.COLUMN_NAME
WHERE c.TABLE_NAME = '${table}' AND c.TABLE_SCHEMA = '${schema}'
ORDER BY c.ORDINAL_POSITION
`);
// Get foreign key information
const foreignKeys = await runQuery(connStr, `
SELECT
fk.name as FK_NAME,
COL_NAME(fkc.parent_object_id, fkc.parent_column_id) as COLUMN_NAME,
OBJECT_NAME(fkc.referenced_object_id) as REFERENCED_TABLE,
COL_NAME(fkc.referenced_object_id, fkc.referenced_column_id) as REFERENCED_COLUMN
FROM sys.foreign_keys fk
JOIN sys.foreign_key_columns fkc ON fk.object_id = fkc.constraint_object_id
WHERE OBJECT_NAME(fk.parent_object_id) = '${table}'
AND SCHEMA_NAME(fk.schema_id) = '${schema}'
`);
// Get indexes
const indexes = await runQuery(connStr, `
SELECT
i.name as INDEX_NAME,
i.type_desc as INDEX_TYPE,
i.is_unique as IS_UNIQUE,
STRING_AGG(c.name, ', ') WITHIN GROUP (ORDER BY ic.key_ordinal) as COLUMNS
FROM sys.indexes i
JOIN sys.index_columns ic ON i.object_id = ic.object_id AND i.index_id = ic.index_id
JOIN sys.columns c ON ic.object_id = c.object_id AND ic.column_id = c.column_id
WHERE i.object_id = OBJECT_ID('${schema}.${table}')
AND i.name IS NOT NULL
GROUP BY i.name, i.type_desc, i.is_unique
`);
const output = {
table: `${schema}.${table}`,
columns,
foreignKeys,
indexes,
};
return {
content: [
{
type: "text",
text: JSON.stringify(output, null, 2),
},
],
};
} catch (error) {
return {
content: [
{
type: "text",
text: `Error describing table: ${error instanceof Error ? error.message : String(error)}`,
},
],
isError: true,
};
}
}
);
// Tool: Execute a SELECT query (read-only)
server.tool(
"query",
"Execute a SELECT query against the database. Only SELECT statements are allowed for safety.",
{
sql: z.string().describe("The SELECT SQL query to execute"),
database: z.string().optional().describe("Database name (uses current if not specified)"),
},
async ({ sql: sqlQuery, database }) => {
try {
// Basic validation - only allow SELECT queries
const trimmedQuery = sqlQuery.trim().toUpperCase();
if (!trimmedQuery.startsWith("SELECT") &&
!trimmedQuery.startsWith("WITH") &&
!trimmedQuery.startsWith("EXEC SP_HELP")) {
return {
content: [
{
type: "text",
text: "Error: Only SELECT queries, WITH (CTEs), and EXEC sp_help are allowed. Use execute_sql for other operations.",
},
],
isError: true,
};
}
const connStr = getConnString(database);
const rows = await runQuery(connStr, sqlQuery);
return {
content: [
{
type: "text",
text: JSON.stringify({
rowCount: rows?.length || 0,
rows: rows,
}, null, 2),
},
],
};
} catch (error) {
return {
content: [
{
type: "text",
text: `Error executing query: ${error instanceof Error ? error.message : String(error)}`,
},
],
isError: true,
};
}
}
);
// Tool: Execute any SQL (with confirmation)
server.tool(
"execute_sql",
"Execute any SQL statement (INSERT, UPDATE, DELETE, CREATE, ALTER, DROP, etc.). Use with caution!",
{
sql: z.string().describe("The SQL statement to execute"),
database: z.string().optional().describe("Database name (uses current if not specified)"),
},
async ({ sql: sqlQuery, database }) => {
try {
const connStr = getConnString(database);
const result = await runQuery(connStr, sqlQuery);
return {
content: [
{
type: "text",
text: JSON.stringify({
result: result,
}, null, 2),
},
],
};
} catch (error) {
return {
content: [
{
type: "text",
text: `Error executing SQL: ${error instanceof Error ? error.message : String(error)}`,
},
],
isError: true,
};
}
}
);
// Tool: Get database info
server.tool(
"database_info",
"Get detailed information about the current or specified database",
{
database: z.string().optional().describe("Database name (uses current if not specified)"),
},
async ({ database }) => {
try {
const targetDb = database || currentDatabase;
const connStr = getConnectionString(targetDb);
// Get database properties
const dbInfo = await runQuery(connStr, `
SELECT
DB_NAME() as database_name,
@@VERSION as sql_version,
SERVERPROPERTY('ProductVersion') as product_version,
SERVERPROPERTY('Edition') as edition
`);
// Get table count
const tableCount = await runQuery(connStr, `
SELECT COUNT(*) as table_count FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_TYPE = 'BASE TABLE'
`);
// Get view count
const viewCount = await runQuery(connStr, `
SELECT COUNT(*) as view_count FROM INFORMATION_SCHEMA.VIEWS
`);
// Get stored procedure count
const spCount = await runQuery(connStr, `
SELECT COUNT(*) as sp_count FROM INFORMATION_SCHEMA.ROUTINES WHERE ROUTINE_TYPE = 'PROCEDURE'
`);
// Get database size
const sizeInfo = await runQuery(connStr, `
SELECT
SUM(size * 8 / 1024) as size_mb
FROM sys.database_files
`);
const output = {
...dbInfo[0],
table_count: tableCount[0]?.table_count,
view_count: viewCount[0]?.view_count,
stored_procedure_count: spCount[0]?.sp_count,
size_mb: sizeInfo[0]?.size_mb,
};
return {
content: [
{
type: "text",
text: JSON.stringify(output, null, 2),
},
],
};
} catch (error) {
return {
content: [
{
type: "text",
text: `Error getting database info: ${error instanceof Error ? error.message : String(error)}`,
},
],
isError: true,
};
}
}
);
// Tool: Get stored procedures
server.tool(
"list_stored_procedures",
"List all stored procedures in the database",
{
database: z.string().optional().describe("Database name (uses current if not specified)"),
},
async ({ database }) => {
try {
const connStr = getConnString(database);
const rows = await runQuery(connStr, `
SELECT
ROUTINE_SCHEMA as [schema],
ROUTINE_NAME as [name],
CREATED as created_date,
LAST_ALTERED as last_modified
FROM INFORMATION_SCHEMA.ROUTINES
WHERE ROUTINE_TYPE = 'PROCEDURE'
ORDER BY ROUTINE_SCHEMA, ROUTINE_NAME
`);
return {
content: [
{
type: "text",
text: JSON.stringify(rows, null, 2),
},
],
};
} catch (error) {
return {
content: [
{
type: "text",
text: `Error listing stored procedures: ${error instanceof Error ? error.message : String(error)}`,
},
],
isError: true,
};
}
}
);
// Tool: Get stored procedure definition
server.tool(
"get_procedure_definition",
"Get the definition/source code of a stored procedure",
{
procedure: z.string().describe("Name of the stored procedure"),
schema: z.string().optional().default("dbo").describe("Schema name (defaults to 'dbo')"),
database: z.string().optional().describe("Database name (uses current if not specified)"),
},
async ({ procedure, schema, database }) => {
try {
const connStr = getConnString(database);
const rows = await runQuery(connStr, `
SELECT OBJECT_DEFINITION(OBJECT_ID('${schema}.${procedure}')) as definition
`);
return {
content: [
{
type: "text",
text: rows[0]?.definition || "Procedure not found or definition not available",
},
],
};
} catch (error) {
return {
content: [
{
type: "text",
text: `Error getting procedure definition: ${error instanceof Error ? error.message : String(error)}`,
},
],
isError: true,
};
}
}
);
// Tool: Sample data from a table
server.tool(
"sample_data",
"Get sample rows from a table to understand its data",
{
table: z.string().describe("Name of the table"),
schema: z.string().optional().default("dbo").describe("Schema name (defaults to 'dbo')"),
limit: z.number().optional().default(10).describe("Number of rows to return (default: 10, max: 100)"),
database: z.string().optional().describe("Database name (uses current if not specified)"),
},
async ({ table, schema, limit, database }) => {
try {
const safeLimit = Math.min(limit || 10, 100);
const connStr = getConnString(database);
const rows = await runQuery(connStr, `
SELECT TOP ${safeLimit} * FROM [${schema}].[${table}]
`);
return {
content: [
{
type: "text",
text: JSON.stringify({
table: `${schema}.${table}`,
rowCount: rows.length,
rows: rows,
}, null, 2),
},
],
};
} catch (error) {
return {
content: [
{
type: "text",
text: `Error sampling data: ${error instanceof Error ? error.message : String(error)}`,
},
],
isError: true,
};
}
}
);
// Main function
async function main() {
const transport = new StdioServerTransport();
await server.connect(transport);
console.error("LocalDB MCP Server running on stdio");
}
main().catch((error) => {
console.error("Fatal error in main():", error);
process.exit(1);
});