batch_process_csv
Process multiple CSV files simultaneously with consistent filtering and column selection to achieve significant token savings compared to individual file processing.
Instructions
Process multiple CSV files in a single call with consistent filtering. Achieves 80% token savings for multiple files vs individual calls.
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| file_paths | Yes | Paths to CSV files (max 5) | |
| filter_expr | No | Filter expression applied to all files | |
| columns | No | Columns to select from all files | |
| limit | No | Maximum rows per file | |
| aggregate | No | If true, combine results from all files into aggregated summary |
Implementation Reference
- src/index.ts:297-385 (handler)Main handler for batch_process_csv tool. Processes multiple CSV files (max 5) with consistent filtering and optional aggregation. Calls executeProcessCSV for each file, aggregates results, and returns token metrics with estimated 80% savings compared to individual file calls.
private async batchProcessCSV(args: { file_paths: string[]; filter_expr?: string; columns?: string[]; limit?: number; aggregate?: boolean; }): Promise<{ content: any[] }> { const startTime = Date.now(); const filePaths = args.file_paths.slice(0, 5); // Max 5 files const results: any[] = []; for (const filePath of filePaths) { try { const singleArgs = { file_path: filePath, filter_expr: args.filter_expr, columns: args.columns, limit: args.limit || 100, offset: 0, response_format: 'summary' }; const innerResult = await this.executeProcessCSV(singleArgs); results.push({ file_path: filePath, success: true, result: innerResult.result }); } catch (error: any) { results.push({ file_path: filePath, success: false, error: error.message }); } } const processingTime = Date.now() - startTime; const successful = results.filter(r => r.success); let finalResult: any; if (args.aggregate) { // Aggregate results finalResult = { success: true, batch_summary: { total_files: filePaths.length, successful: successful.length, failed: results.length - successful.length, processing_time_ms: processingTime }, aggregated_data: { total_rows: successful.reduce((sum, r) => sum + (r.result?.total_rows || 0), 0), filtered_rows: successful.reduce((sum, r) => sum + (r.result?.filtered_rows || 0), 0), returned_rows: successful.reduce((sum, r) => sum + (r.result?.returned_rows || 0), 0), }, per_file_results: results }; } else { finalResult = { success: true, total_files: filePaths.length, successful: successful.length, failed: results.length - successful.length, processing_time_ms: processingTime, results }; } return { content: [{ type: 'text', text: JSON.stringify({ ...finalResult, token_metrics: { input_tokens: this.estimateTokens(JSON.stringify(args)), output_tokens: this.estimateTokens(JSON.stringify(finalResult)), result_size_bytes: this.getResultSizeBytes(finalResult), processing_time_ms: processingTime, cached: false, cache_hit: false, estimated_savings_percent: '80', note: 'Batch processing saves ~80% vs individual file calls' } }, null, 2) }] }; } - src/index.ts:702-716 (schema)Tool registration schema defining batch_process_csv input parameters: file_paths (array of max 5), filter_expr, columns, limit, and aggregate flag. Includes description emphasizing 80% token savings for multiple files.
name: 'batch_process_csv', title: 'Batch Process Multiple CSV Files', description: 'Process multiple CSV files in a single call with consistent filtering. Achieves 80% token savings for multiple files vs individual calls.', inputSchema: { type: 'object', properties: { file_paths: { type: 'array', items: { type: 'string' }, description: 'Paths to CSV files (max 5)', minItems: 1, maxItems: 5 }, filter_expr: { type: 'string', description: 'Filter expression applied to all files' }, columns: { type: 'array', items: { type: 'string' }, description: 'Columns to select from all files' }, limit: { type: 'number', default: 100, description: 'Maximum rows per file' }, aggregate: { type: 'boolean', default: false, description: 'If true, combine results from all files into aggregated summary' }, }, required: ['file_paths'], $schema: 'https://json-schema.org/draft/2020-12/schema', }, - src/index.ts:104-105 (registration)Switch case routing tool call 'batch_process_csv' to the batchProcessCSV handler method.
case 'batch_process_csv': return await this.batchProcessCSV(args as any); - src/index.ts:724-850 (helper)executeProcessCSV helper method that performs actual CSV processing using Python/pandas. Supports caching, filtering, column selection, aggregation, and pagination. Generates dynamic Python code for each file.
private async executeProcessCSV(args: any): Promise<ProcessResult> { const cacheKey = ResultCache.generateKey('process_csv', args); const cached = this.cache.get(cacheKey); if (cached) { const cachedResult = cached.result as ProcessResult; // Don't return cached errors if (!cachedResult.success) { return cachedResult; } return { success: true, result: { ...(cachedResult.result || {}), cached: true, cache_hit: true }, token_metrics: { input_tokens: cachedResult.token_metrics?.input_tokens || 0, output_tokens: cachedResult.token_metrics?.output_tokens || 0, result_size_bytes: cachedResult.token_metrics?.result_size_bytes || 0, processing_time_ms: cachedResult.token_metrics?.processing_time_ms || 0, cached: true, cache_hit: true, estimated_savings_percent: '90' } }; } const startTime = Date.now(); const offset = args.offset || 0; const limit = args.limit || 100; const aggregateBy = args.aggregate_by || ''; const aggFunc = args.agg_func || 'mean'; const pythonCode = ` import pandas as pd import json import sys try: # Read CSV df = pd.read_csv('${args.file_path}') total_rows = len(df) # Apply filters if '${args.filter_expr || ''}': df = df.query('${args.filter_expr}') filtered_count = len(df) # Select columns if ${JSON.stringify(args.columns || [])}: df = df[${JSON.stringify(args.columns)}] # Handle aggregation (groupby) if '${aggregateBy}': agg_func = '${aggFunc}' grouped = df.groupby('${aggregateBy}', dropna=False) result = { 'total_rows': total_rows, 'filtered_rows': filtered_count, 'grouped': True, 'group_by_column': '${aggregateBy}', 'aggregation_function': agg_func, 'num_groups': len(grouped), 'groups': {} } for group_name, group_df in grouped: group_stats = {} numeric_cols = group_df.select_dtypes(include=['number']).columns for col in numeric_cols: if agg_func == 'count': group_stats[col] = len(group_df[col].dropna()) elif agg_func == 'sum': group_stats[col] = float(group_df[col].sum()) elif agg_func == 'mean': group_stats[col] = float(group_df[col].mean()) elif agg_func == 'min': group_stats[col] = float(group_df[col].min()) elif agg_func == 'max': group_stats[col] = float(group_df[col].max()) result['groups'][str(group_name)] = { 'count': len(group_df), 'stats': group_stats } print(json.dumps(result)) else: # Apply offset and limit (pagination) df = df.iloc[${offset}:${offset + limit}] result = { 'total_rows': total_rows, 'filtered_rows': filtered_count, 'returned_rows': len(df), 'offset': ${offset}, 'limit': ${limit}, 'has_more': ${offset + limit} < filtered_count, 'columns': list(df.columns), } if '${args.response_format || 'summary'}' == 'summary': numeric_cols = df.select_dtypes(include=['number']).columns.tolist() if len(numeric_cols) > 0: result['data_summary'] = df[numeric_cols].describe().to_dict() result['sample_data'] = df.head(5).to_dict('records') else: result['data'] = df.to_dict('records') print(json.dumps(result)) except Exception as e: print(json.dumps({ 'success': False, 'error': str(e) })) `; const result = await this.executePython(pythonCode); const processingTime = Date.now() - startTime; // Only cache successful results - src/index.ts:129-138 (helper)Utility methods estimateTokens and getResultSizeBytes used by batch_process_csv to calculate token metrics and result sizes for performance tracking.
private estimateTokens(text: string): number { return Math.ceil(text.length / 4); } /** * Calculate result size in bytes */ private getResultSizeBytes(result: any): number { return JSON.stringify(result).length; }