import { PassThrough } from "node:stream";
import type Docker from "dockerode";
import { validateCommandAllowlist } from "../../../utils/command-security.js";
/**
* Stream context for container exec operations.
* Manages all streams and buffers for capturing command output.
*/
export interface ExecStreamContext {
stream: NodeJS.ReadableStream;
stdoutStream: PassThrough;
stderrStream: PassThrough;
stdoutChunks: Buffer[];
stderrChunks: Buffer[];
stdoutSize: number;
stderrSize: number;
bufferExceeded: { value: boolean };
cleanup: () => void;
}
/**
* Settlement handlers for promise resolution.
* Ensures only one settlement (success or rejection) occurs.
*/
export interface SettlementHandlers {
settleWithRejection: (error: Error) => void;
settleWithSuccess: () => void;
}
/**
* Set up Docker exec instance with validated command.
* Validates command against allowlist and creates exec configuration.
*
* @param container - Docker container instance
* @param options - Exec options including command, user, and workdir
* @returns Promise resolving to Docker.Exec instance
* @throws Error if command validation fails
*/
export async function setupExecInstance(
container: Docker.Container,
options: { command: string; user?: string; workdir?: string }
): Promise<Docker.Exec> {
const parts = validateCommandAllowlist(options.command);
return await container.exec({
Cmd: parts,
AttachStdout: true,
AttachStderr: true,
User: options.user,
WorkingDir: options.workdir,
});
}
/**
* Set up buffer tracking for stdout and stderr streams.
* Attaches data event handlers that track buffer sizes and detect overflows.
*
* @param context - Stream context containing streams and buffer state
* @param maxBuffer - Maximum buffer size in bytes
*/
export function setupBufferTracking(
context: Pick<
ExecStreamContext,
| "stdoutStream"
| "stderrStream"
| "stdoutChunks"
| "stderrChunks"
| "stdoutSize"
| "stderrSize"
| "bufferExceeded"
| "cleanup"
>,
maxBuffer: number
): void {
// Track stdout buffer size and set flag if limit exceeded
context.stdoutStream.on("data", (chunk: Buffer) => {
if (context.bufferExceeded.value) return;
if (context.stdoutSize + chunk.length > maxBuffer) {
context.bufferExceeded.value = true;
context.cleanup();
return;
}
context.stdoutSize += chunk.length;
context.stdoutChunks.push(chunk);
});
// Track stderr buffer size and set flag if limit exceeded
context.stderrStream.on("data", (chunk: Buffer) => {
if (context.bufferExceeded.value) return;
if (context.stderrSize + chunk.length > maxBuffer) {
context.bufferExceeded.value = true;
context.cleanup();
return;
}
context.stderrSize += chunk.length;
context.stderrChunks.push(chunk);
});
}
/**
* Create stream context for exec operation.
* Sets up streams, buffers, and cleanup handler for container exec.
*
* @param exec - Docker exec instance
* @param modem - Docker modem for stream demuxing
* @param maxBuffer - Maximum buffer size in bytes
* @returns Stream context with initialized streams and cleanup function
*/
export async function createStreamContext(
exec: Docker.Exec,
modem: ReturnType<Docker["getContainer"]>["modem"],
maxBuffer: number
): Promise<ExecStreamContext> {
const stream = await exec.start({ hijack: true, stdin: false });
const stdoutStream = new PassThrough();
const stderrStream = new PassThrough();
const stdoutChunks: Buffer[] = [];
const stderrChunks: Buffer[] = [];
const bufferExceeded = { value: false };
const context: ExecStreamContext = {
stream,
stdoutStream,
stderrStream,
stdoutChunks,
stderrChunks,
stdoutSize: 0,
stderrSize: 0,
bufferExceeded,
cleanup: () => {
try {
stream.destroy();
} catch {
/* Intentionally swallow errors - stream already destroyed or errored */
}
try {
stdoutStream.destroy();
} catch {
/* Intentionally swallow errors - stream already destroyed or errored */
}
try {
stderrStream.destroy();
} catch {
/* Intentionally swallow errors - stream already destroyed or errored */
}
},
};
setupBufferTracking(context, maxBuffer);
// Demux the main stream into stdout and stderr
modem.demuxStream(stream, stdoutStream, stderrStream);
return context;
}
/**
* Attach event handlers for execution monitoring.
* Sets up error, data, and end event listeners on streams.
*
* @param context - Stream context with streams and buffer state
* @param handlers - Handler functions for error, success, and buffer checks
* @param maxBuffer - Maximum buffer size in bytes
*/
export function attachExecutionHandlers(
context: Pick<ExecStreamContext, "stream" | "stdoutStream" | "stderrStream" | "bufferExceeded">,
handlers: {
onError: (err: Error) => void;
onSuccess: () => void;
checkBufferExceeded: () => void;
},
maxBuffer: number
): void {
// Handle stream errors
context.stream.on("error", handlers.onError);
context.stdoutStream.on("error", handlers.onError);
context.stderrStream.on("error", handlers.onError);
// Check for buffer exceeded after each data event
context.stdoutStream.on("data", handlers.checkBufferExceeded);
context.stderrStream.on("data", handlers.checkBufferExceeded);
// Handle stream end
context.stream.on("end", () => {
if (context.bufferExceeded.value) {
handlers.onError(new Error(`Buffer limit exceeded: output exceeded ${maxBuffer} bytes`));
} else {
handlers.onSuccess();
}
});
}
/**
* Create settlement handlers with guard against double settlement.
* Encapsulates cleanup and promise settlement logic.
*
* @param cleanup - Cleanup function to call before settlement
* @param resolve - Promise resolve function
* @param reject - Promise reject function
* @returns Settlement handlers object
*/
export function createSettlementHandlers(
cleanup: () => void,
resolve: () => void,
reject: (error: Error) => void
): SettlementHandlers {
let settled = false;
return {
settleWithRejection: (error: Error): void => {
if (settled) return;
settled = true;
cleanup();
reject(error);
},
settleWithSuccess: (): void => {
if (settled) return;
settled = true;
cleanup();
resolve();
},
};
}
/**
* Execute with timeout and buffer limit monitoring.
* Wraps stream context in promise with timeout and error handling.
*
* @param context - Stream context from createStreamContext
* @param timeout - Timeout in milliseconds
* @param maxBuffer - Maximum buffer size in bytes
* @returns Promise that resolves when stream completes successfully
* @throws Error on timeout, buffer exceeded, or stream error
*/
export async function executeWithTimeout(
context: Pick<
ExecStreamContext,
"stream" | "stdoutStream" | "stderrStream" | "bufferExceeded" | "cleanup"
>,
timeout: number,
maxBuffer: number
): Promise<void> {
let timeoutId: ReturnType<typeof setTimeout> | null = null;
const cleanup = (): void => {
if (timeoutId !== null) {
clearTimeout(timeoutId);
timeoutId = null;
}
context.cleanup();
};
return new Promise<void>((resolve, reject) => {
const { settleWithRejection, settleWithSuccess } = createSettlementHandlers(
cleanup,
resolve,
reject
);
// Set up timeout
timeoutId = setTimeout(() => {
settleWithRejection(new Error(`Exec timeout: command exceeded ${timeout}ms limit`));
}, timeout);
// Attach stream event handlers
attachExecutionHandlers(
context,
{
onError: settleWithRejection,
onSuccess: settleWithSuccess,
checkBufferExceeded: () => {
if (context.bufferExceeded.value) {
settleWithRejection(
new Error(`Buffer limit exceeded: output exceeded ${maxBuffer} bytes`)
);
}
},
},
maxBuffer
);
});
}