Skip to main content
Glama
query-sqlite.ts11.4 kB
import type { McpServer } from '@modelcontextprotocol/sdk/server/mcp.js'; import { z } from 'zod'; import sqlite3 from 'sqlite3'; import fs from 'fs-extra'; import { quote } from 'shell-quote'; /** * Tool: SQLite Database Query * Registers the tool with the MCP server * @param server MCP server instance */ const registerTool = (server: McpServer) => { server.registerTool( 'query-sqlite', { title: 'SQLite Database Query', description: "Performs various SQLite database operations including creating databases, executing queries, importing/exporting CSV, getting schema info, and more.", inputSchema: { database_path: z.string().describe("Path to the SQLite database file."), operation: z.string().describe("The operation to perform: create_db, execute_query, import_csv, export_csv, get_schema, drop_table, backup, restore, transaction, get_stats."), query: z.string().optional().describe("SQL query to execute (for execute_query and transaction operations)."), table_name: z.string().optional().describe("Table name (for drop_table operation)."), csv_path: z.string().optional().describe("Path to CSV file (for import_csv and export_csv operations)."), backup_path: z.string().optional().describe("Path to backup file (for backup and restore operations)."), } }, async (params) => { const { database_path, operation, query, table_name, csv_path, backup_path } = params; try { // Ensure the directory exists await fs.ensureDir(require('path').dirname(database_path)); // Open database const db = new sqlite3.Database(database_path); const dbRun = (sql: string, params?: any[]) => { return new Promise((resolve, reject) => { db.run(sql, params, function(err) { if (err) reject(err); else resolve(this); }); }); }; const dbAll = (sql: string, params?: any[]) => { return new Promise<any[]>((resolve, reject) => { db.all(sql, params, (err, rows) => { if (err) reject(err); else resolve(rows); }); }); }; const dbGet = (sql: string, params?: any[]) => { return new Promise<any>((resolve, reject) => { db.get(sql, params, (err, row) => { if (err) reject(err); else resolve(row); }); }); }; const dbClose = () => { return new Promise<void>((resolve, reject) => { db.close((err) => { if (err) reject(err); else resolve(); }); }); }; let resultText = ''; switch (operation) { case 'create_db': // Database is created when opened resultText = `SQLite database created at ${database_path}`; break; case 'execute_query': if (!query) { await dbClose(); return { content: [{ type: 'text', text: "Error: 'query' parameter is required for execute_query operation." }], isError: true }; } // For SELECT queries, return results if (query.trim().toUpperCase().startsWith('SELECT')) { const rows = await dbAll(query); resultText = JSON.stringify(rows, null, 2); } else { // For INSERT/UPDATE/DELETE, return changes const result: any = await dbRun(query); resultText = `Query executed successfully. Rows affected: ${result.changes || 0}`; } break; case 'import_csv': if (!csv_path) { await dbClose(); return { content: [{ type: 'text', text: "Error: 'csv_path' parameter is required for import_csv operation." }], isError: true }; } // Check if CSV file exists if (!await fs.pathExists(csv_path)) { await dbClose(); return { content: [{ type: 'text', text: `Error: CSV file not found at ${csv_path}` }], isError: true }; } // Read CSV file const csvContent = await fs.readFile(csv_path, 'utf-8'); const lines = csvContent.split('\n').filter(line => line.trim() !== ''); if (lines.length < 2) { await dbClose(); return { content: [{ type: 'text', text: "Error: CSV file must contain at least a header row and one data row." }], isError: true }; } // Get table name from CSV file name if not provided const tableName = table_name || require('path').basename(csv_path, '.csv'); // Parse header const header = lines[0].split(',').map(h => h.trim()); // Create table const createTableQuery = `CREATE TABLE IF NOT EXISTS ${tableName} (${header.map(h => `${h} TEXT`).join(', ')})`; await dbRun(createTableQuery); // Insert data for (let i = 1; i < lines.length; i++) { const values = lines[i].split(',').map(v => v.trim()); if (values.length === header.length) { const placeholders = header.map(() => '?').join(', '); const insertQuery = `INSERT INTO ${tableName} VALUES (${placeholders})`; await dbRun(insertQuery, values); } } resultText = `CSV data imported successfully into table ${tableName}`; break; case 'export_csv': if (!table_name || !csv_path) { await dbClose(); return { content: [{ type: 'text', text: "Error: 'table_name' and 'csv_path' parameters are required for export_csv operation." }], isError: true }; } // Get table data const rows = await dbAll(`SELECT * FROM ${table_name}`); if (rows.length === 0) { await dbClose(); return { content: [{ type: 'text', text: `Table ${table_name} is empty or does not exist.` }], isError: true }; } // Generate CSV content const headers = Object.keys(rows[0]); const csvLines = [headers.join(',')]; rows.forEach(row => { const values = headers.map(header => { const value = row[header]; // Escape commas and quotes in values if (typeof value === 'string' && (value.includes(',') || value.includes('"'))) { return `"${value.replace(/"/g, '""')}"`; } return value; }); csvLines.push(values.join(',')); }); // Write CSV file await fs.writeFile(csv_path, csvLines.join('\n')); resultText = `Table ${table_name} exported successfully to ${csv_path}`; break; case 'get_schema': const schemaRows = await dbAll(` SELECT name, sql FROM sqlite_master WHERE type='table' AND name NOT LIKE 'sqlite_%' ORDER BY name `); resultText = JSON.stringify(schemaRows, null, 2); break; case 'drop_table': if (!table_name) { await dbClose(); return { content: [{ type: 'text', text: "Error: 'table_name' parameter is required for drop_table operation." }], isError: true }; } await dbRun(`DROP TABLE IF EXISTS ${table_name}`); resultText = `Table ${table_name} dropped successfully`; break; case 'backup': if (!backup_path) { await dbClose(); return { content: [{ type: 'text', text: "Error: 'backup_path' parameter is required for backup operation." }], isError: true }; } // Close the database first await dbClose(); // Copy database file to backup location await fs.copy(database_path, backup_path); resultText = `Database backed up successfully to ${backup_path}`; break; case 'restore': if (!backup_path) { return { content: [{ type: 'text', text: "Error: 'backup_path' parameter is required for restore operation." }], isError: true }; } // Close the database first await dbClose(); // Copy backup file to database location await fs.copy(backup_path, database_path); resultText = `Database restored successfully from ${backup_path}`; break; case 'transaction': if (!query) { await dbClose(); return { content: [{ type: 'text', text: "Error: 'query' parameter is required for transaction operation." }], isError: true }; } // Begin transaction await dbRun('BEGIN TRANSACTION'); try { // Execute queries const queries = query.split(';').filter(q => q.trim() !== ''); for (const q of queries) { await dbRun(q); } // Commit transaction await dbRun('COMMIT'); resultText = 'Transaction executed successfully'; } catch (error) { // Rollback transaction on error await dbRun('ROLLBACK'); throw error; } break; case 'get_stats': const stats = await fs.stat(database_path); const tableCountRow = await dbGet(` SELECT COUNT(*) as count FROM sqlite_master WHERE type='table' AND name NOT LIKE 'sqlite_%' `); resultText = JSON.stringify({ fileSize: stats.size, lastModified: stats.mtime, tableCount: tableCountRow?.count || 0 }, null, 2); break; } // Close database connection await dbClose(); return { content: [{ type: 'text', text: resultText }] }; } catch (error: any) { return { content: [{ type: 'text', text: `Error performing SQLite operation: ${error.message}` }], isError: true }; } } ); }; export default registerTool;

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Yussefgafer/MyMCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server