/**
* Spawn and manage usql subprocesses
*/
import { spawn, ChildProcess } from "child_process";
import { createLogger } from "../utils/logger.js";
import { createUsqlError } from "../utils/error-handler.js";
import { formatConnectionStringForLogging } from "./connection.js";
import { UsqlExecutorOptions } from "../types/index.js";
const logger = createLogger("usql-mcp:process");
export interface UsqlExecutionResult {
stdout: string;
stderr: string;
exitCode: number;
}
export interface UsqlProcessHandle {
process: ChildProcess;
promise: Promise<UsqlExecutionResult>;
}
export async function executeUsqlCommand(
connectionString: string,
command: string,
options?: UsqlExecutorOptions & { signal?: AbortSignal }
): Promise<UsqlExecutionResult> {
const timeout = options?.timeout;
const format = options?.format || "json";
const signal = options?.signal;
let abortHandler: (() => void) | undefined;
logger.debug("[process-executor] Executing usql command", {
connectionString: formatConnectionStringForLogging(connectionString),
timeout,
format,
binaryPath: process.env.USQL_BINARY_PATH?.trim() || "usql",
});
return new Promise((resolve, reject) => {
let stdout = "";
let stderr = "";
const maxBuffer = 512 * 1024; // 512 KB cap per stream
let timedOut = false;
let aborted = false;
let timeoutHandle: NodeJS.Timeout | null = null;
let childProcess: ChildProcess | null = null;
if (typeof timeout === "number" && timeout > 0) {
timeoutHandle = setTimeout(() => {
timedOut = true;
if (signal && abortHandler) {
signal.removeEventListener("abort", abortHandler as () => void);
}
if (childProcess?.pid) {
logger.debug("[process-executor] Query timeout, killing process", {
pid: childProcess.pid,
});
try {
try {
process.kill(-childProcess.pid);
} catch {
process.kill(childProcess.pid);
}
} catch (e) {
logger.warn("[process-executor] Failed to kill process", { error: e });
}
}
reject(
createUsqlError(
"QueryTimeout",
`Query execution timed out after ${timeout}ms. Consider simplifying your query or increasing the timeout.`,
{ timeout, command: command.substring(0, 100) }
)
);
}, timeout);
}
// Handle abort signal
if (signal) {
if (signal.aborted) {
reject(new Error("Operation aborted"));
return;
}
abortHandler = () => {
aborted = true;
logger.debug("[process-executor] Abort signal received, killing process", {
pid: childProcess?.pid,
});
if (childProcess?.pid) {
try {
try {
process.kill(-childProcess.pid);
} catch {
process.kill(childProcess.pid);
}
} catch (e) {
logger.warn("[process-executor] Failed to kill process on abort", { error: e });
}
}
if (timeoutHandle) {
clearTimeout(timeoutHandle);
}
reject(new Error("Operation aborted by client"));
};
signal.addEventListener("abort", abortHandler);
const removeAbortHandler = (): void => {
signal.removeEventListener("abort", abortHandler as () => void);
};
// Attach cleanup listeners once we have a process instance
const attachCleanup = (proc: ChildProcess): void => {
proc.on("error", removeAbortHandler);
proc.on("close", removeAbortHandler);
};
if (childProcess) {
attachCleanup(childProcess);
}
}
// Build usql arguments
const args = [connectionString, "-c", command];
const configuredCommand = process.env.USQL_BINARY_PATH?.trim();
const commandToRun =
configuredCommand && configuredCommand.length > 0 ? configuredCommand : "usql";
// Add format flag
if (format === "json") {
args.push("--json");
} else if (format === "csv") {
args.push("--csv");
}
// Use detached process group for better cleanup
childProcess = spawn(commandToRun, args, {
detached: true,
stdio: ["pipe", "pipe", "pipe"],
env: process.env,
});
logger.debug("[process-executor] Spawned usql process", {
pid: childProcess.pid,
command: commandToRun,
args: args.map((arg, i) => (i === 0 ? formatConnectionStringForLogging(arg) : arg)),
});
childProcess.stdout?.on("data", (data) => {
const chunk = data.toString();
if (stdout.length + chunk.length <= maxBuffer) {
stdout += chunk;
}
});
childProcess.stderr?.on("data", (data) => {
const chunk = data.toString();
if (stderr.length + chunk.length <= maxBuffer) {
stderr += chunk;
}
});
childProcess.on("error", (error) => {
if (signal && abortHandler) {
signal.removeEventListener("abort", abortHandler as () => void);
}
if (timeoutHandle) {
clearTimeout(timeoutHandle);
}
if (!timedOut && !aborted) {
logger.error("[process-executor] Process error", error);
reject(error);
}
});
childProcess.on("close", (exitCode) => {
if (signal && abortHandler) {
signal.removeEventListener("abort", abortHandler as () => void);
}
if (timeoutHandle) {
clearTimeout(timeoutHandle);
}
if (timedOut || aborted) {
return; // Already rejected
}
logger.debug("[process-executor] Process exited", {
exitCode,
stdoutLength: stdout.length,
stderrLength: stderr.length,
});
resolve({
stdout,
stderr,
exitCode: exitCode || 0,
});
});
childProcess.stdin?.end();
});
}
export async function executeUsqlQuery(
connectionString: string,
query: string,
options?: UsqlExecutorOptions
): Promise<UsqlExecutionResult> {
// Don't escape - spawn() doesn't use shell, so no escaping needed
return executeUsqlCommand(connectionString, query, {
timeout: options?.timeout,
format: options?.format || "json",
signal: options?.signal,
});
}
/* global AbortSignal */