import * as fsp from 'node:fs/promises';
import * as path from 'node:path';
import type { Stats } from 'node:fs';
import type { FileHandle } from 'node:fs/promises';
import { Writable } from 'node:stream';
import { pipeline } from 'node:stream/promises';
import type { FileType } from '../config.js';
import {
BINARY_CHECK_BUFFER_SIZE,
KNOWN_BINARY_EXTENSIONS,
MAX_TEXT_FILE_SIZE,
PARALLEL_CONCURRENCY,
} from './constants.js';
import { ErrorCode, McpError } from './errors.js';
import { validateExistingPath } from './path-validation.js';
function createAbortError(message = 'Operation aborted'): Error {
const error = new Error(message);
error.name = 'AbortError';
return error;
}
export function assertNotAborted(signal?: AbortSignal, message?: string): void {
if (!signal?.aborted) return;
const { reason } = signal as { reason?: unknown };
if (reason instanceof Error) {
throw reason;
}
throw createAbortError(message);
}
function getAbortError(signal: AbortSignal, message?: string): Error {
const { reason } = signal as { reason?: unknown };
if (reason instanceof Error) {
return reason;
}
return createAbortError(message);
}
export function withAbort<T>(
promise: Promise<T>,
signal?: AbortSignal
): Promise<T> {
if (!signal) return promise;
if (signal.aborted) {
throw getAbortError(signal);
}
return new Promise<T>((resolve, reject) => {
const onAbort = (): void => {
reject(getAbortError(signal));
};
signal.addEventListener('abort', onAbort, { once: true });
promise
.then((value) => {
signal.removeEventListener('abort', onAbort);
resolve(value);
})
.catch((error: unknown) => {
signal.removeEventListener('abort', onAbort);
reject(error instanceof Error ? error : new Error(String(error)));
});
});
}
export function createTimedAbortSignal(
baseSignal: AbortSignal | undefined,
timeoutMs?: number
): { signal: AbortSignal; cleanup: () => void } {
if (!baseSignal && !timeoutMs) {
return createNoopSignal();
}
if (!timeoutMs && baseSignal) {
return createForwardedSignal(baseSignal);
}
if (typeof timeoutMs === 'number' && shouldUseAbortAny(baseSignal)) {
return createAnySignal(baseSignal, timeoutMs);
}
return createManualSignal(baseSignal, timeoutMs);
}
function createNoopSignal(): { signal: AbortSignal; cleanup: () => void } {
const controller = new AbortController();
return { signal: controller.signal, cleanup: () => {} };
}
function createForwardedSignal(baseSignal: AbortSignal): {
signal: AbortSignal;
cleanup: () => void;
} {
return { signal: baseSignal, cleanup: () => {} };
}
function shouldUseAbortAny(
baseSignal: AbortSignal | undefined
): baseSignal is AbortSignal {
return typeof AbortSignal.any === 'function' && baseSignal !== undefined;
}
function createAnySignal(
baseSignal: AbortSignal,
timeoutMs: number
): { signal: AbortSignal; cleanup: () => void } {
const timeoutSignal = AbortSignal.timeout(timeoutMs);
const combined = AbortSignal.any([baseSignal, timeoutSignal]);
return { signal: combined, cleanup: () => {} };
}
function createManualSignal(
baseSignal: AbortSignal | undefined,
timeoutMs: number | undefined
): { signal: AbortSignal; cleanup: () => void } {
const controller = new AbortController();
const forwardAbort = (): void => {
const reason =
baseSignal?.reason instanceof Error ? baseSignal.reason : undefined;
controller.abort(reason);
};
if (baseSignal) {
if (baseSignal.aborted) {
forwardAbort();
} else {
baseSignal.addEventListener('abort', forwardAbort, { once: true });
}
}
const timeoutId = createTimeout(controller, timeoutMs);
const cleanup = (): void => {
if (baseSignal) {
baseSignal.removeEventListener('abort', forwardAbort);
}
if (timeoutId) {
clearTimeout(timeoutId);
}
};
return { signal: controller.signal, cleanup };
}
function createTimeout(
controller: AbortController,
timeoutMs: number | undefined
): ReturnType<typeof setTimeout> | undefined {
if (typeof timeoutMs !== 'number' || !Number.isFinite(timeoutMs)) {
return undefined;
}
return setTimeout(() => {
controller.abort(createAbortError('Operation timed out'));
}, timeoutMs);
}
interface ParallelResult<R> {
results: R[];
errors: { index: number; error: Error }[];
}
interface ParallelState<T, R> {
items: T[];
processor: (item: T) => Promise<R>;
concurrency: number;
results: R[];
errors: { index: number; error: Error }[];
nextIndex: number;
aborted: boolean;
inFlight: Set<Promise<void>>;
}
function createParallelAbortError(): Error {
const error = new Error('Operation aborted');
error.name = 'AbortError';
return error;
}
function createState<T, R>(
items: T[],
processor: (item: T) => Promise<R>,
concurrency: number,
signal?: AbortSignal
): ParallelState<T, R> {
return {
items,
processor,
concurrency,
results: [],
errors: [],
nextIndex: 0,
aborted: Boolean(signal?.aborted),
inFlight: new Set<Promise<void>>(),
};
}
function attachAbortListener<T, R>(
state: ParallelState<T, R>,
signal?: AbortSignal
): () => void {
if (!signal || signal.aborted) return () => {};
const onAbort = (): void => {
state.aborted = true;
};
signal.addEventListener('abort', onAbort, { once: true });
return (): void => {
signal.removeEventListener('abort', onAbort);
};
}
function createAbortPromise(signal?: AbortSignal): Promise<void> | undefined {
if (!signal) return undefined;
if (signal.aborted) return Promise.resolve();
return new Promise((resolve) => {
const onAbort = (): void => {
resolve();
};
signal.addEventListener('abort', onAbort, { once: true });
});
}
function canStartNext<T, R>(state: ParallelState<T, R>): boolean {
return (
!state.aborted &&
state.inFlight.size < state.concurrency &&
state.nextIndex < state.items.length
);
}
function createTask<T, R>(
item: T,
index: number,
state: ParallelState<T, R>
): Promise<void> {
return (async (): Promise<void> => {
try {
const result = await state.processor(item);
state.results.push(result);
} catch (reason) {
const error =
reason instanceof Error ? reason : new Error(String(reason));
state.errors.push({ index, error });
}
})();
}
function queueNextTask<T, R>(state: ParallelState<T, R>): void {
const index = state.nextIndex;
state.nextIndex += 1;
const item = state.items[index];
if (item === undefined) return;
const task = createTask(item, index, state);
state.inFlight.add(task);
void task.finally(() => {
state.inFlight.delete(task);
});
}
function startNextTasks<T, R>(state: ParallelState<T, R>): void {
while (canStartNext(state)) {
queueNextTask(state);
}
}
async function drainTasks<T, R>(
state: ParallelState<T, R>,
abortPromise?: Promise<void>
): Promise<void> {
startNextTasks(state);
while (state.inFlight.size > 0) {
if (state.aborted) return;
const raceTargets = abortPromise
? [...state.inFlight, abortPromise]
: [...state.inFlight];
await Promise.race(raceTargets);
startNextTasks(state);
}
}
export async function processInParallel<T, R>(
items: T[],
processor: (item: T) => Promise<R>,
concurrency: number = PARALLEL_CONCURRENCY,
signal?: AbortSignal
): Promise<ParallelResult<R>> {
const state = createState(items, processor, concurrency, signal);
const abortPromise = createAbortPromise(signal);
if (items.length === 0) {
return { results: state.results, errors: state.errors };
}
const detachAbort = attachAbortListener(state, signal);
try {
await drainTasks(state, abortPromise);
} finally {
detachAbort();
}
if (state.aborted) {
throw createParallelAbortError();
}
return { results: state.results, errors: state.errors };
}
export function getFileType(stats: Stats): FileType {
if (stats.isFile()) return 'file';
if (stats.isDirectory()) return 'directory';
if (stats.isSymbolicLink()) return 'symlink';
return 'other';
}
export function isHidden(name: string): boolean {
return name.startsWith('.');
}
function hasKnownBinaryExtension(filePath: string): boolean {
const ext = path.extname(filePath).toLowerCase();
return KNOWN_BINARY_EXTENSIONS.has(ext);
}
async function withFileHandle<T>(
filePath: string,
fn: (handle: fsp.FileHandle) => Promise<T>,
existingHandle?: fsp.FileHandle,
signal?: AbortSignal
): Promise<T> {
if (existingHandle) {
return fn(existingHandle);
}
const effectivePath = await validateExistingPath(filePath, signal);
const handle = await withAbort(fsp.open(effectivePath, 'r'), signal);
try {
return await fn(handle);
} finally {
await handle.close().catch((error: unknown) => {
console.error('Failed to close file handle:', error);
});
}
}
async function readProbe(
handle: fsp.FileHandle,
signal?: AbortSignal
): Promise<Buffer> {
const buffer = Buffer.allocUnsafe(BINARY_CHECK_BUFFER_SIZE);
const { bytesRead } = await withAbort(
handle.read(buffer, 0, BINARY_CHECK_BUFFER_SIZE, 0),
signal
);
if (bytesRead === 0) {
return Buffer.alloc(0);
}
return buffer.subarray(0, bytesRead);
}
function hasUtf8Bom(slice: Buffer): boolean {
return (
slice.length >= 3 &&
slice[0] === 0xef &&
slice[1] === 0xbb &&
slice[2] === 0xbf
);
}
function hasUtf16Bom(slice: Buffer): boolean {
return (
slice.length >= 2 &&
((slice[0] === 0xff && slice[1] === 0xfe) ||
(slice[0] === 0xfe && slice[1] === 0xff))
);
}
export async function isProbablyBinary(
filePath: string,
existingHandle?: fsp.FileHandle,
signal?: AbortSignal
): Promise<boolean> {
if (hasKnownBinaryExtension(filePath)) {
return true;
}
return withFileHandle(
filePath,
async (handle) => {
const slice = await readProbe(handle, signal);
return isBinarySlice(slice);
},
existingHandle,
signal
);
}
function isBinarySlice(slice: Buffer): boolean {
if (slice.length === 0) return false;
if (hasUtf8Bom(slice) || hasUtf16Bom(slice)) return false;
return slice.includes(0);
}
export type ReadMode = 'head' | 'full';
export interface ReadFileOptions {
encoding?: BufferEncoding;
maxSize?: number;
head?: number;
skipBinary?: boolean;
signal?: AbortSignal;
}
export interface NormalizedOptions {
encoding: BufferEncoding;
maxSize: number;
head?: number;
skipBinary: boolean;
signal?: AbortSignal;
}
export interface ReadFileResult {
path: string;
content: string;
truncated: boolean;
totalLines?: number;
readMode: ReadMode;
head?: number;
linesRead?: number;
hasMoreLines?: boolean;
}
function normalizeOptions(options: ReadFileOptions): NormalizedOptions {
const normalized: NormalizedOptions = {
encoding: options.encoding ?? 'utf-8',
maxSize: Math.min(
options.maxSize ?? MAX_TEXT_FILE_SIZE,
MAX_TEXT_FILE_SIZE
),
skipBinary: options.skipBinary ?? false,
};
if (options.head !== undefined) {
normalized.head = options.head;
}
if (options.signal) {
normalized.signal = options.signal;
}
return normalized;
}
function resolveReadMode(options: NormalizedOptions): ReadMode {
if (options.head !== undefined) return 'head';
return 'full';
}
const STREAM_CHUNK_SIZE = 64 * 1024;
function createTooLargeError(
bytesRead: number,
maxSize: number,
requestedPath: string
): McpError {
return new McpError(
ErrorCode.E_TOO_LARGE,
`File exceeds maximum size (${bytesRead} > ${maxSize}): ${requestedPath}`,
requestedPath,
{ size: bytesRead, maxSize }
);
}
class BufferCollector extends Writable {
#chunks: Buffer[] = [];
#totalSize = 0;
#maxSize: number;
#requestedPath: string;
constructor(maxSize: number, requestedPath: string) {
super({ autoDestroy: true });
this.#maxSize = maxSize;
this.#requestedPath = requestedPath;
}
override _write(
chunk: Buffer | string,
_encoding: BufferEncoding,
callback: (error?: Error | null) => void
): void {
const buffer = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk);
this.#totalSize += buffer.length;
if (this.#totalSize > this.#maxSize) {
callback(
createTooLargeError(this.#totalSize, this.#maxSize, this.#requestedPath)
);
return;
}
this.#chunks.push(buffer);
callback();
}
getResult(): Buffer {
return Buffer.concat(this.#chunks, this.#totalSize);
}
}
async function readFileBufferWithLimit(
handle: FileHandle,
maxSize: number,
requestedPath: string,
signal?: AbortSignal
): Promise<Buffer> {
const stream = handle.createReadStream({
start: 0,
highWaterMark: STREAM_CHUNK_SIZE,
autoClose: false,
emitClose: false,
});
const collector = new BufferCollector(maxSize, requestedPath);
await pipeline(stream, collector, { signal });
return collector.getResult();
}
async function headFile(
handle: fsp.FileHandle,
numLines: number,
encoding: BufferEncoding = 'utf-8',
maxBytesRead?: number,
signal?: AbortSignal
): Promise<string> {
assertNotAborted(signal);
const lines: string[] = [];
let estimatedBytes = 0;
const hasMaxBytes = maxBytesRead !== undefined;
for await (const line of handle.readLines({ encoding, signal })) {
lines.push(line);
if (lines.length >= numLines) break;
if (!hasMaxBytes) continue;
estimatedBytes += Buffer.byteLength(line, encoding) + 1;
if (estimatedBytes >= maxBytesRead) break;
}
return lines.join('\n');
}
function countLines(content: string): number {
if (content.length === 0) return 0;
let count = 1;
for (let i = 0; i < content.length; i++) {
if (content.charCodeAt(i) === 10) count++;
}
return count;
}
async function readHeadContent(
handle: FileHandle,
head: number,
options: { encoding: BufferEncoding; maxSize: number; signal?: AbortSignal }
): Promise<{
content: string;
truncated: boolean;
linesRead: number;
hasMoreLines: boolean;
}> {
const content = await headFile(
handle,
head,
options.encoding,
options.maxSize,
options.signal
);
const linesRead = countLines(content);
const hasMoreLines = linesRead >= head;
return {
content,
truncated: hasMoreLines,
linesRead,
hasMoreLines,
};
}
async function readFullContent(
handle: FileHandle,
encoding: BufferEncoding,
maxSize: number,
requestedPath: string,
signal?: AbortSignal
): Promise<{ content: string; totalLines: number }> {
const buffer = await readFileBufferWithLimit(
handle,
maxSize,
requestedPath,
signal
);
const content = buffer.toString(encoding);
return { content, totalLines: countLines(content) };
}
async function assertNotBinary(
validPath: string,
filePath: string,
normalized: NormalizedOptions
): Promise<void> {
assertNotAborted(normalized.signal);
const isBinary = await isProbablyBinary(
validPath,
undefined,
normalized.signal
);
if (!isBinary) return;
throw new McpError(
ErrorCode.E_INVALID_INPUT,
`Binary file detected: ${filePath}. Set skipBinary=false to read as text.`,
filePath
);
}
function requireHead(normalized: NormalizedOptions, filePath: string): number {
if (normalized.head === undefined) {
throw new McpError(
ErrorCode.E_INVALID_INPUT,
'Missing head option',
filePath
);
}
return normalized.head;
}
function buildHeadResult(
validPath: string,
content: string,
truncated: boolean,
head: number,
linesRead: number,
hasMoreLines: boolean
): ReadFileResult {
return {
path: validPath,
content,
truncated,
readMode: 'head',
head,
linesRead,
hasMoreLines,
};
}
function buildFullResult(
validPath: string,
content: string,
totalLines: number
): ReadFileResult {
return {
path: validPath,
content,
truncated: false,
totalLines,
readMode: 'full',
linesRead: totalLines,
hasMoreLines: false,
};
}
function assertSizeWithinLimit(
size: number,
maxSize: number,
filePath: string
): void {
if (size <= maxSize) return;
throw new McpError(
ErrorCode.E_TOO_LARGE,
`File too large: ${size} bytes (max: ${maxSize} bytes). Use head parameter to preview the first N lines.`,
filePath,
{ size, maxSize }
);
}
async function readHeadResult(
handle: FileHandle,
validPath: string,
filePath: string,
normalized: NormalizedOptions
): Promise<ReadFileResult> {
const head = requireHead(normalized, filePath);
const readOptions: Parameters<typeof readHeadContent>[2] = {
encoding: normalized.encoding,
maxSize: normalized.maxSize,
};
if (normalized.signal) {
readOptions.signal = normalized.signal;
}
const { content, truncated, linesRead, hasMoreLines } = await readHeadContent(
handle,
head,
readOptions
);
return buildHeadResult(
validPath,
content,
truncated,
head,
linesRead,
hasMoreLines
);
}
async function readFullResult(
handle: FileHandle,
validPath: string,
filePath: string,
stats: Stats,
normalized: NormalizedOptions
): Promise<ReadFileResult> {
assertSizeWithinLimit(stats.size, normalized.maxSize, filePath);
const { content, totalLines } = await readFullContent(
handle,
normalized.encoding,
normalized.maxSize,
filePath,
normalized.signal
);
return buildFullResult(validPath, content, totalLines);
}
async function readByMode(
handle: FileHandle,
validPath: string,
filePath: string,
stats: Stats,
normalized: NormalizedOptions
): Promise<ReadFileResult> {
const mode = resolveReadMode(normalized);
if (mode === 'head') {
return await readHeadResult(handle, validPath, filePath, normalized);
}
return await readFullResult(handle, validPath, filePath, stats, normalized);
}
function assertFileStats(filePath: string, stats: Stats): void {
if (!stats.isFile()) {
throw new McpError(
ErrorCode.E_NOT_FILE,
`Not a file: ${filePath}`,
filePath
);
}
}
async function readFileWithStatsInternal(
filePath: string,
validPath: string,
stats: Stats,
normalized: NormalizedOptions
): Promise<ReadFileResult> {
assertNotAborted(normalized.signal);
assertFileStats(filePath, stats);
if (normalized.skipBinary) {
await assertNotBinary(validPath, filePath, normalized);
}
assertNotAborted(normalized.signal);
const handle = await withAbort(fsp.open(validPath, 'r'), normalized.signal);
try {
return await readByMode(handle, validPath, filePath, stats, normalized);
} finally {
await handle.close();
}
}
export async function readFileWithStats(
filePath: string,
validPath: string,
stats: Stats,
options: ReadFileOptions = {}
): Promise<ReadFileResult> {
const normalized = normalizeOptions(options);
assertNotAborted(normalized.signal);
return await readFileWithStatsInternal(
filePath,
validPath,
stats,
normalized
);
}
export async function readFile(
filePath: string,
options: ReadFileOptions = {}
): Promise<ReadFileResult> {
const normalized = normalizeOptions(options);
assertNotAborted(normalized.signal);
const validPath = await validateExistingPath(filePath, normalized.signal);
assertNotAborted(normalized.signal);
const stats = await withAbort(fsp.stat(validPath), normalized.signal);
return await readFileWithStatsInternal(
filePath,
validPath,
stats,
normalized
);
}
export { headFile };