Skip to main content
Glama
ingpoc

Token-Efficient MCP Server

by ingpoc

batch_process_csv

Process multiple CSV files simultaneously with consistent filtering and column selection to achieve significant token savings compared to individual file processing.

Instructions

Process multiple CSV files in a single call with consistent filtering. Achieves 80% token savings for multiple files vs individual calls.

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
file_pathsYesPaths to CSV files (max 5)
filter_exprNoFilter expression applied to all files
columnsNoColumns to select from all files
limitNoMaximum rows per file
aggregateNoIf true, combine results from all files into aggregated summary

Implementation Reference

  • Main handler for batch_process_csv tool. Processes multiple CSV files (max 5) with consistent filtering and optional aggregation. Calls executeProcessCSV for each file, aggregates results, and returns token metrics with estimated 80% savings compared to individual file calls.
    private async batchProcessCSV(args: {
      file_paths: string[];
      filter_expr?: string;
      columns?: string[];
      limit?: number;
      aggregate?: boolean;
    }): Promise<{ content: any[] }> {
      const startTime = Date.now();
      const filePaths = args.file_paths.slice(0, 5); // Max 5 files
      const results: any[] = [];
    
      for (const filePath of filePaths) {
        try {
          const singleArgs = {
            file_path: filePath,
            filter_expr: args.filter_expr,
            columns: args.columns,
            limit: args.limit || 100,
            offset: 0,
            response_format: 'summary'
          };
    
          const innerResult = await this.executeProcessCSV(singleArgs);
          results.push({
            file_path: filePath,
            success: true,
            result: innerResult.result
          });
        } catch (error: any) {
          results.push({
            file_path: filePath,
            success: false,
            error: error.message
          });
        }
      }
    
      const processingTime = Date.now() - startTime;
      const successful = results.filter(r => r.success);
    
      let finalResult: any;
    
      if (args.aggregate) {
        // Aggregate results
        finalResult = {
          success: true,
          batch_summary: {
            total_files: filePaths.length,
            successful: successful.length,
            failed: results.length - successful.length,
            processing_time_ms: processingTime
          },
          aggregated_data: {
            total_rows: successful.reduce((sum, r) => sum + (r.result?.total_rows || 0), 0),
            filtered_rows: successful.reduce((sum, r) => sum + (r.result?.filtered_rows || 0), 0),
            returned_rows: successful.reduce((sum, r) => sum + (r.result?.returned_rows || 0), 0),
          },
          per_file_results: results
        };
      } else {
        finalResult = {
          success: true,
          total_files: filePaths.length,
          successful: successful.length,
          failed: results.length - successful.length,
          processing_time_ms: processingTime,
          results
        };
      }
    
      return {
        content: [{
          type: 'text',
          text: JSON.stringify({
            ...finalResult,
            token_metrics: {
              input_tokens: this.estimateTokens(JSON.stringify(args)),
              output_tokens: this.estimateTokens(JSON.stringify(finalResult)),
              result_size_bytes: this.getResultSizeBytes(finalResult),
              processing_time_ms: processingTime,
              cached: false,
              cache_hit: false,
              estimated_savings_percent: '80',
              note: 'Batch processing saves ~80% vs individual file calls'
            }
          }, null, 2)
        }]
      };
    }
  • Tool registration schema defining batch_process_csv input parameters: file_paths (array of max 5), filter_expr, columns, limit, and aggregate flag. Includes description emphasizing 80% token savings for multiple files.
    name: 'batch_process_csv',
    title: 'Batch Process Multiple CSV Files',
    description: 'Process multiple CSV files in a single call with consistent filtering. Achieves 80% token savings for multiple files vs individual calls.',
    inputSchema: {
      type: 'object',
      properties: {
        file_paths: { type: 'array', items: { type: 'string' }, description: 'Paths to CSV files (max 5)', minItems: 1, maxItems: 5 },
        filter_expr: { type: 'string', description: 'Filter expression applied to all files' },
        columns: { type: 'array', items: { type: 'string' }, description: 'Columns to select from all files' },
        limit: { type: 'number', default: 100, description: 'Maximum rows per file' },
        aggregate: { type: 'boolean', default: false, description: 'If true, combine results from all files into aggregated summary' },
      },
      required: ['file_paths'],
      $schema: 'https://json-schema.org/draft/2020-12/schema',
    },
  • src/index.ts:104-105 (registration)
    Switch case routing tool call 'batch_process_csv' to the batchProcessCSV handler method.
    case 'batch_process_csv':
      return await this.batchProcessCSV(args as any);
  • executeProcessCSV helper method that performs actual CSV processing using Python/pandas. Supports caching, filtering, column selection, aggregation, and pagination. Generates dynamic Python code for each file.
      private async executeProcessCSV(args: any): Promise<ProcessResult> {
        const cacheKey = ResultCache.generateKey('process_csv', args);
        const cached = this.cache.get(cacheKey);
    
        if (cached) {
          const cachedResult = cached.result as ProcessResult;
          // Don't return cached errors
          if (!cachedResult.success) {
            return cachedResult;
          }
          return {
            success: true,
            result: {
              ...(cachedResult.result || {}),
              cached: true,
              cache_hit: true
            },
            token_metrics: {
              input_tokens: cachedResult.token_metrics?.input_tokens || 0,
              output_tokens: cachedResult.token_metrics?.output_tokens || 0,
              result_size_bytes: cachedResult.token_metrics?.result_size_bytes || 0,
              processing_time_ms: cachedResult.token_metrics?.processing_time_ms || 0,
              cached: true,
              cache_hit: true,
              estimated_savings_percent: '90'
            }
          };
        }
    
        const startTime = Date.now();
        const offset = args.offset || 0;
        const limit = args.limit || 100;
        const aggregateBy = args.aggregate_by || '';
        const aggFunc = args.agg_func || 'mean';
    
        const pythonCode = `
    import pandas as pd
    import json
    import sys
    
    try:
        # Read CSV
        df = pd.read_csv('${args.file_path}')
        total_rows = len(df)
    
        # Apply filters
        if '${args.filter_expr || ''}':
            df = df.query('${args.filter_expr}')
    
        filtered_count = len(df)
    
        # Select columns
        if ${JSON.stringify(args.columns || [])}:
            df = df[${JSON.stringify(args.columns)}]
    
        # Handle aggregation (groupby)
        if '${aggregateBy}':
            agg_func = '${aggFunc}'
            grouped = df.groupby('${aggregateBy}', dropna=False)
    
            result = {
                'total_rows': total_rows,
                'filtered_rows': filtered_count,
                'grouped': True,
                'group_by_column': '${aggregateBy}',
                'aggregation_function': agg_func,
                'num_groups': len(grouped),
                'groups': {}
            }
    
            for group_name, group_df in grouped:
                group_stats = {}
                numeric_cols = group_df.select_dtypes(include=['number']).columns
    
                for col in numeric_cols:
                    if agg_func == 'count':
                        group_stats[col] = len(group_df[col].dropna())
                    elif agg_func == 'sum':
                        group_stats[col] = float(group_df[col].sum())
                    elif agg_func == 'mean':
                        group_stats[col] = float(group_df[col].mean())
                    elif agg_func == 'min':
                        group_stats[col] = float(group_df[col].min())
                    elif agg_func == 'max':
                        group_stats[col] = float(group_df[col].max())
    
                result['groups'][str(group_name)] = {
                    'count': len(group_df),
                    'stats': group_stats
                }
    
            print(json.dumps(result))
        else:
            # Apply offset and limit (pagination)
            df = df.iloc[${offset}:${offset + limit}]
    
            result = {
                'total_rows': total_rows,
                'filtered_rows': filtered_count,
                'returned_rows': len(df),
                'offset': ${offset},
                'limit': ${limit},
                'has_more': ${offset + limit} < filtered_count,
                'columns': list(df.columns),
            }
    
            if '${args.response_format || 'summary'}' == 'summary':
                numeric_cols = df.select_dtypes(include=['number']).columns.tolist()
                if len(numeric_cols) > 0:
                    result['data_summary'] = df[numeric_cols].describe().to_dict()
                result['sample_data'] = df.head(5).to_dict('records')
            else:
                result['data'] = df.to_dict('records')
    
            print(json.dumps(result))
    
    except Exception as e:
        print(json.dumps({
            'success': False,
            'error': str(e)
        }))
    `;
    
        const result = await this.executePython(pythonCode);
        const processingTime = Date.now() - startTime;
    
        // Only cache successful results
  • Utility methods estimateTokens and getResultSizeBytes used by batch_process_csv to calculate token metrics and result sizes for performance tracking.
    private estimateTokens(text: string): number {
      return Math.ceil(text.length / 4);
    }
    
    /**
     * Calculate result size in bytes
     */
    private getResultSizeBytes(result: any): number {
      return JSON.stringify(result).length;
    }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/ingpoc/token-efficient-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server