Skip to main content
Glama
csv-processing.md16.3 kB
# CSV Processing Examples Efficient processing of large CSV files with Large File MCP Server. ## Overview CSV files can grow to millions of rows in data processing pipelines. This guide demonstrates memory-efficient patterns for CSV analysis, validation, and transformation. ## Basic CSV Operations ### Read CSV with Headers Parse CSV with automatic header detection: ```typescript async function readCsvWithHeaders(csvFile: string) { // Read first chunk to get headers const firstChunk = await read_large_file_chunk({ filePath: csvFile, chunkIndex: 0 }); const lines = firstChunk.content.split("\n"); const headers = lines[0].split(",").map(h => h.trim()); console.log("CSV Headers:", headers); console.log("Total rows:", firstChunk.totalLines - 1); // Exclude header // Read first few data rows const dataRows = lines.slice(1, 6); console.log("\nFirst 5 rows:"); dataRows.forEach((row, idx) => { const values = row.split(","); console.log(`Row ${idx + 1}:`); headers.forEach((header, i) => { console.log(` ${header}: ${values[i]}`); }); console.log(); }); } await readCsvWithHeaders("/data/sales.csv"); ``` ### Stream CSV Processing Process large CSV files row-by-row: ```typescript async function processCsv(csvFile: string) { const stream = stream_large_file({ filePath: csvFile, chunkSize: 1000 }); let headers: string[] = []; let isFirstChunk = true; let totalRows = 0; for await (const chunk of stream) { const lines = chunk.lines.filter(line => line.trim()); // Extract headers from first chunk if (isFirstChunk) { headers = lines[0].split(",").map(h => h.trim()); lines.shift(); // Remove header row isFirstChunk = false; } // Process data rows for (const line of lines) { const values = line.split(","); const row = Object.fromEntries( headers.map((header, i) => [header, values[i]]) ); await processRow(row); totalRows++; } console.log(`Processed ${totalRows} rows...`); } console.log(`\nComplete! Total rows: ${totalRows}`); } async function processRow(row: Record<string, string>) { // Your processing logic here // e.g., validation, transformation, database insert } await processCsv("/data/transactions.csv"); ``` ## Data Validation ### Validate CSV Structure Check for structural issues: ```typescript async function validateCsvStructure(csvFile: string) { const stream = stream_large_file({ filePath: csvFile, chunkSize: 1000 }); let headers: string[] = []; let isFirstChunk = true; let errors: Array<{ row: number; issue: string }> = []; let currentRow = 0; for await (const chunk of stream) { const lines = chunk.lines.filter(line => line.trim()); if (isFirstChunk) { headers = lines[0].split(","); lines.shift(); isFirstChunk = false; } for (const line of lines) { currentRow++; const values = line.split(","); // Check column count if (values.length !== headers.length) { errors.push({ row: currentRow, issue: `Column count mismatch: expected ${headers.length}, got ${values.length}` }); } // Check for empty values values.forEach((value, idx) => { if (!value.trim()) { errors.push({ row: currentRow, issue: `Empty value in column "${headers[idx]}"` }); } }); // Limit error collection if (errors.length >= 100) break; } if (errors.length >= 100) break; } // Report results if (errors.length === 0) { console.log("✓ CSV structure is valid"); } else { console.log(`✗ Found ${errors.length} issues:\n`); errors.slice(0, 20).forEach(error => { console.log(`Row ${error.row}: ${error.issue}`); }); if (errors.length > 20) { console.log(`\n... and ${errors.length - 20} more issues`); } } } await validateCsvStructure("/data/import.csv"); ``` ### Data Type Validation Validate data types in columns: ```typescript interface ColumnSchema { name: string; type: "string" | "number" | "date" | "email"; required?: boolean; } async function validateDataTypes( csvFile: string, schema: ColumnSchema[] ) { const stream = stream_large_file({ filePath: csvFile, chunkSize: 1000 }); let headers: string[] = []; let isFirstChunk = true; let errors: string[] = []; let currentRow = 0; const validators = { number: (val: string) => !isNaN(parseFloat(val)), date: (val: string) => !isNaN(Date.parse(val)), email: (val: string) => /^[^\s@]+@[^\s@]+\.[^\s@]+$/.test(val), string: () => true }; for await (const chunk of stream) { const lines = chunk.lines.filter(line => line.trim()); if (isFirstChunk) { headers = lines[0].split(",").map(h => h.trim()); lines.shift(); isFirstChunk = false; } for (const line of lines) { currentRow++; const values = line.split(","); schema.forEach((column, idx) => { const value = values[idx]?.trim(); // Check required if (column.required && !value) { errors.push( `Row ${currentRow}: Missing required value in "${column.name}"` ); return; } // Check type if (value && !validators[column.type](value)) { errors.push( `Row ${currentRow}: Invalid ${column.type} in "${column.name}": "${value}"` ); } }); if (errors.length >= 100) break; } if (errors.length >= 100) break; } // Report if (errors.length === 0) { console.log("✓ All data types are valid"); } else { console.log(`✗ Found ${errors.length} validation errors:\n`); errors.slice(0, 20).forEach(error => console.log(error)); if (errors.length > 20) { console.log(`\n... and ${errors.length - 20} more errors`); } } } // Usage await validateDataTypes("/data/users.csv", [ { name: "id", type: "number", required: true }, { name: "email", type: "email", required: true }, { name: "created_at", type: "date", required: true }, { name: "age", type: "number" } ]); ``` ## Data Analysis ### Calculate Statistics Compute statistical summary: ```typescript async function calculateStats(csvFile: string, columnName: string) { const stream = stream_large_file({ filePath: csvFile, chunkSize: 1000 }); let headers: string[] = []; let isFirstChunk = true; let columnIndex = -1; const values: number[] = []; for await (const chunk of stream) { const lines = chunk.lines.filter(line => line.trim()); if (isFirstChunk) { headers = lines[0].split(",").map(h => h.trim()); columnIndex = headers.indexOf(columnName); if (columnIndex === -1) { throw new Error(`Column "${columnName}" not found`); } lines.shift(); isFirstChunk = false; } for (const line of lines) { const cols = line.split(","); const value = parseFloat(cols[columnIndex]); if (!isNaN(value)) { values.push(value); } } } // Calculate statistics values.sort((a, b) => a - b); const sum = values.reduce((a, b) => a + b, 0); const mean = sum / values.length; const median = values[Math.floor(values.length / 2)]; const min = values[0]; const max = values[values.length - 1]; // Standard deviation const squaredDiffs = values.map(v => Math.pow(v - mean, 2)); const variance = squaredDiffs.reduce((a, b) => a + b, 0) / values.length; const stdDev = Math.sqrt(variance); console.log(`=== Statistics for "${columnName}" ===\n`); console.log(`Count: ${values.length}`); console.log(`Mean: ${mean.toFixed(2)}`); console.log(`Median: ${median.toFixed(2)}`); console.log(`Std Dev: ${stdDev.toFixed(2)}`); console.log(`Min: ${min.toFixed(2)}`); console.log(`Max: ${max.toFixed(2)}`); console.log(`Range: ${(max - min).toFixed(2)}`); } await calculateStats("/data/sales.csv", "amount"); ``` ### Group By and Aggregate Calculate aggregates by category: ```typescript async function groupByAndSum( csvFile: string, groupColumn: string, sumColumn: string ) { const stream = stream_large_file({ filePath: csvFile, chunkSize: 1000 }); let headers: string[] = []; let isFirstChunk = true; let groupIdx = -1; let sumIdx = -1; const groups = new Map<string, number>(); for await (const chunk of stream) { const lines = chunk.lines.filter(line => line.trim()); if (isFirstChunk) { headers = lines[0].split(",").map(h => h.trim()); groupIdx = headers.indexOf(groupColumn); sumIdx = headers.indexOf(sumColumn); if (groupIdx === -1 || sumIdx === -1) { throw new Error("Column not found"); } lines.shift(); isFirstChunk = false; } for (const line of lines) { const cols = line.split(","); const groupKey = cols[groupIdx].trim(); const value = parseFloat(cols[sumIdx]); if (!isNaN(value)) { groups.set(groupKey, (groups.get(groupKey) || 0) + value); } } } // Display results console.log(`=== ${sumColumn} by ${groupColumn} ===\n`); Array.from(groups.entries()) .sort(([, a], [, b]) => b - a) .forEach(([key, sum]) => { console.log(`${key}: ${sum.toFixed(2)}`); }); } await groupByAndSum("/data/sales.csv", "category", "amount"); ``` ## Data Transformation ### Filter and Export Filter rows and write to new file: ```typescript async function filterCsv( inputFile: string, outputFile: string, predicate: (row: Record<string, string>) => boolean ) { const stream = stream_large_file({ filePath: inputFile, chunkSize: 1000 }); const writer = fs.createWriteStream(outputFile); let headers: string[] = []; let isFirstChunk = true; let filteredCount = 0; for await (const chunk of stream) { const lines = chunk.lines.filter(line => line.trim()); if (isFirstChunk) { headers = lines[0].split(",").map(h => h.trim()); writer.write(lines[0] + "\n"); // Write headers lines.shift(); isFirstChunk = false; } for (const line of lines) { const values = line.split(","); const row = Object.fromEntries( headers.map((h, i) => [h, values[i]]) ); if (predicate(row)) { writer.write(line + "\n"); filteredCount++; } } } writer.end(); console.log(`Filtered ${filteredCount} rows to ${outputFile}`); } // Usage: Export only completed transactions await filterCsv( "/data/transactions.csv", "/data/completed.csv", row => row.status === "completed" ); ``` ### Transform Columns Apply transformations to columns: ```typescript async function transformCsv( inputFile: string, outputFile: string, transforms: Record<string, (value: string) => string> ) { const stream = stream_large_file({ filePath: inputFile, chunkSize: 1000 }); const writer = fs.createWriteStream(outputFile); let headers: string[] = []; let isFirstChunk = true; for await (const chunk of stream) { const lines = chunk.lines.filter(line => line.trim()); if (isFirstChunk) { headers = lines[0].split(",").map(h => h.trim()); writer.write(lines[0] + "\n"); lines.shift(); isFirstChunk = false; } for (const line of lines) { const values = line.split(","); const transformed = values.map((value, idx) => { const header = headers[idx]; if (transforms[header]) { return transforms[header](value); } return value; }); writer.write(transformed.join(",") + "\n"); } } writer.end(); } // Usage: Normalize data await transformCsv( "/data/users.csv", "/data/users_normalized.csv", { email: email => email.toLowerCase(), name: name => name.trim().toUpperCase(), created_at: date => new Date(date).toISOString() } ); ``` ## Data Quality ### Find Duplicates Detect duplicate rows: ```typescript async function findDuplicates( csvFile: string, keyColumns: string[] ) { const stream = stream_large_file({ filePath: csvFile, chunkSize: 1000 }); let headers: string[] = []; let isFirstChunk = true; const seen = new Map<string, number[]>(); let currentRow = 0; for await (const chunk of stream) { const lines = chunk.lines.filter(line => line.trim()); if (isFirstChunk) { headers = lines[0].split(",").map(h => h.trim()); lines.shift(); isFirstChunk = false; } for (const line of lines) { currentRow++; const values = line.split(","); // Create key from specified columns const keyIndices = keyColumns.map(col => headers.indexOf(col)); const key = keyIndices.map(idx => values[idx]).join("|"); if (!seen.has(key)) { seen.set(key, []); } seen.get(key)!.push(currentRow); } } // Find duplicates const duplicates = Array.from(seen.entries()) .filter(([, rows]) => rows.length > 1) .map(([key, rows]) => ({ key, rows })); console.log(`=== Duplicate Analysis ===\n`); console.log(`Total rows: ${currentRow}`); console.log(`Unique: ${seen.size}`); console.log(`Duplicates: ${duplicates.length}\n`); if (duplicates.length > 0) { console.log("Top duplicates:"); duplicates .sort((a, b) => b.rows.length - a.rows.length) .slice(0, 10) .forEach(({ key, rows }) => { console.log(`${key}: ${rows.length} occurrences at rows ${rows.join(", ")}`); }); } } await findDuplicates("/data/users.csv", ["email"]); ``` ### Check Data Completeness Analyze missing values: ```typescript async function analyzeCompleteness(csvFile: string) { const stream = stream_large_file({ filePath: csvFile, chunkSize: 1000 }); let headers: string[] = []; let isFirstChunk = true; const missing = new Map<string, number>(); let totalRows = 0; for await (const chunk of stream) { const lines = chunk.lines.filter(line => line.trim()); if (isFirstChunk) { headers = lines[0].split(",").map(h => h.trim()); headers.forEach(h => missing.set(h, 0)); lines.shift(); isFirstChunk = false; } for (const line of lines) { totalRows++; const values = line.split(","); values.forEach((value, idx) => { if (!value.trim()) { const header = headers[idx]; missing.set(header, (missing.get(header) || 0) + 1); } }); } } // Report console.log(`=== Data Completeness ===\n`); console.log(`Total rows: ${totalRows}\n`); Array.from(missing.entries()) .sort(([, a], [, b]) => b - a) .forEach(([column, count]) => { const percent = ((count / totalRows) * 100).toFixed(1); console.log(`${column}: ${count} missing (${percent}%)`); if (count / totalRows > 0.1) { console.log(` ⚠ High missing rate`); } }); } await analyzeCompleteness("/data/dataset.csv"); ``` ## Best Practices ### 1. Always Use Streaming for Large Files ```typescript // Good: Streaming approach async function processLargeCsv(file: string) { const stream = stream_large_file({ filePath: file, chunkSize: 1000 }); for await (const chunk of stream) { await processChunk(chunk.lines); } } // Bad: Loading entire file const allData = fs.readFileSync(file, "utf-8").split("\n"); ``` ### 2. Handle Headers Correctly ```typescript let isFirstChunk = true; let headers: string[] = []; for await (const chunk of stream) { let lines = chunk.lines; if (isFirstChunk) { headers = lines[0].split(","); lines = lines.slice(1); // Remove header isFirstChunk = false; } // Process data rows } ``` ### 3. Validate Before Processing ```typescript // Validate structure first await validateCsvStructure(file); // Then process await processCsv(file); ``` ## See Also - [API Reference](/api/reference) - All available tools - [stream_large_file](/api/stream) - Streaming documentation - [Log Analysis](/examples/log-analysis) - Log processing examples - [Performance Guide](/guide/performance) - Optimization tips - [Best Practices](/guide/best-practices) - Usage recommendations

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/willianpinho/large-file-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server