Skip to main content
Glama

MaverickMCP

by wshobson
MIT License
165
  • Apple
data_chunking.py20 kB
""" Data chunking utilities for memory-efficient processing of large datasets. Provides streaming, batching, and generator-based approaches for handling large DataFrames. """ import logging import math from collections.abc import Callable, Generator from typing import Any, Literal import numpy as np import pandas as pd from maverick_mcp.utils.memory_profiler import ( force_garbage_collection, get_dataframe_memory_usage, memory_context, optimize_dataframe, ) logger = logging.getLogger(__name__) # Default chunk size configurations DEFAULT_CHUNK_SIZE_MB = 50.0 MAX_CHUNK_SIZE_MB = 200.0 MIN_ROWS_PER_CHUNK = 100 class DataChunker: """Advanced data chunking utility with multiple strategies.""" def __init__( self, chunk_size_mb: float = DEFAULT_CHUNK_SIZE_MB, min_rows_per_chunk: int = MIN_ROWS_PER_CHUNK, optimize_chunks: bool = True, auto_gc: bool = True, ): """Initialize data chunker. Args: chunk_size_mb: Target chunk size in megabytes min_rows_per_chunk: Minimum rows per chunk optimize_chunks: Whether to optimize chunk memory usage auto_gc: Whether to automatically run garbage collection """ self.chunk_size_mb = min(chunk_size_mb, MAX_CHUNK_SIZE_MB) self.chunk_size_bytes = int(self.chunk_size_mb * 1024 * 1024) self.min_rows_per_chunk = min_rows_per_chunk self.optimize_chunks = optimize_chunks self.auto_gc = auto_gc logger.debug( f"DataChunker initialized: {self.chunk_size_mb}MB chunks, " f"min {self.min_rows_per_chunk} rows" ) def estimate_chunk_size(self, df: pd.DataFrame) -> tuple[int, int]: """Estimate optimal chunk size for a DataFrame. Args: df: DataFrame to analyze Returns: Tuple of (rows_per_chunk, estimated_chunks) """ total_memory = df.memory_usage(deep=True).sum() memory_per_row = total_memory / len(df) if len(df) > 0 else 0 if memory_per_row == 0: return len(df), 1 # Calculate rows per chunk based on memory target rows_per_chunk = max( self.min_rows_per_chunk, int(self.chunk_size_bytes / memory_per_row) ) # Ensure we don't exceed the DataFrame size rows_per_chunk = min(rows_per_chunk, len(df)) estimated_chunks = math.ceil(len(df) / rows_per_chunk) logger.debug( f"Estimated chunking: {rows_per_chunk} rows/chunk, " f"{estimated_chunks} chunks total" ) return rows_per_chunk, estimated_chunks def chunk_by_rows( self, df: pd.DataFrame, rows_per_chunk: int = None ) -> Generator[pd.DataFrame, None, None]: """Chunk DataFrame by number of rows. Args: df: DataFrame to chunk rows_per_chunk: Rows per chunk (auto-estimated if None) Yields: DataFrame chunks """ if rows_per_chunk is None: rows_per_chunk, _ = self.estimate_chunk_size(df) total_chunks = math.ceil(len(df) / rows_per_chunk) logger.debug( f"Chunking {len(df)} rows into {total_chunks} chunks " f"of ~{rows_per_chunk} rows each" ) for i, start_idx in enumerate(range(0, len(df), rows_per_chunk)): end_idx = min(start_idx + rows_per_chunk, len(df)) chunk = df.iloc[start_idx:end_idx].copy() if self.optimize_chunks: chunk = optimize_dataframe(chunk) logger.debug( f"Yielding chunk {i + 1}/{total_chunks}: rows {start_idx}-{end_idx - 1}" ) yield chunk # Cleanup after yielding if self.auto_gc: del chunk if i % 5 == 0: # GC every 5 chunks force_garbage_collection() def chunk_by_memory(self, df: pd.DataFrame) -> Generator[pd.DataFrame, None, None]: """Chunk DataFrame by memory size. Args: df: DataFrame to chunk Yields: DataFrame chunks """ total_memory = df.memory_usage(deep=True).sum() if total_memory <= self.chunk_size_bytes: if self.optimize_chunks: df = optimize_dataframe(df) yield df return # Use row-based chunking with memory-based estimation yield from self.chunk_by_rows(df) def chunk_by_date( self, df: pd.DataFrame, freq: Literal["D", "W", "M", "Q", "Y"] = "M", date_column: str = None, ) -> Generator[pd.DataFrame, None, None]: """Chunk DataFrame by date periods. Args: df: DataFrame to chunk (must have datetime index or date_column) freq: Frequency for chunking (D=daily, W=weekly, M=monthly, etc.) date_column: Name of date column (uses index if None) Yields: DataFrame chunks by date periods """ if date_column: if date_column not in df.columns: raise ValueError(f"Date column '{date_column}' not found") elif not isinstance(df.index, pd.DatetimeIndex): raise ValueError( "DataFrame must have datetime index or specify date_column" ) # Group by period period_groups = df.groupby( pd.Grouper(key=date_column, freq=freq) if date_column else pd.Grouper(freq=freq) ) total_periods = len(period_groups) logger.debug(f"Chunking by {freq} periods: {total_periods} chunks") for i, (period, group) in enumerate(period_groups): if len(group) == 0: continue if self.optimize_chunks: group = optimize_dataframe(group) logger.debug( f"Yielding period chunk {i + 1}/{total_periods}: " f"{period} ({len(group)} rows)" ) yield group if self.auto_gc and i % 3 == 0: # GC every 3 periods force_garbage_collection() def process_in_chunks( self, df: pd.DataFrame, processor: Callable[[pd.DataFrame], Any], combiner: Callable[[list], Any] = None, chunk_method: Literal["rows", "memory", "date"] = "memory", **chunk_kwargs, ) -> Any: """Process DataFrame in chunks and combine results. Args: df: DataFrame to process processor: Function to apply to each chunk combiner: Function to combine results (default: list) chunk_method: Chunking method to use **chunk_kwargs: Additional arguments for chunking method Returns: Combined results """ results = [] # Select chunking method if chunk_method == "rows": chunk_generator = self.chunk_by_rows(df, **chunk_kwargs) elif chunk_method == "memory": chunk_generator = self.chunk_by_memory(df) elif chunk_method == "date": chunk_generator = self.chunk_by_date(df, **chunk_kwargs) else: raise ValueError(f"Unknown chunk method: {chunk_method}") with memory_context("chunk_processing"): for i, chunk in enumerate(chunk_generator): try: with memory_context(f"chunk_{i}"): result = processor(chunk) results.append(result) except Exception as e: logger.error(f"Error processing chunk {i}: {e}") raise # Combine results if combiner: return combiner(results) elif results and isinstance(results[0], pd.DataFrame): # Auto-combine DataFrames return pd.concat(results, ignore_index=True) else: return results class StreamingDataProcessor: """Streaming data processor for very large datasets.""" def __init__(self, chunk_size_mb: float = DEFAULT_CHUNK_SIZE_MB): """Initialize streaming processor. Args: chunk_size_mb: Chunk size in MB """ self.chunk_size_mb = chunk_size_mb self.chunker = DataChunker(chunk_size_mb=chunk_size_mb) def stream_from_csv( self, filepath: str, processor: Callable[[pd.DataFrame], Any], chunksize: int = None, **read_kwargs, ) -> Generator[Any, None, None]: """Stream process CSV file in chunks. Args: filepath: Path to CSV file processor: Function to process each chunk chunksize: Rows per chunk (auto-estimated if None) **read_kwargs: Additional arguments for pd.read_csv Yields: Processed results for each chunk """ # Estimate chunk size if not provided if chunksize is None: # Read a sample to estimate memory usage sample = pd.read_csv(filepath, nrows=1000, **read_kwargs) memory_per_row = sample.memory_usage(deep=True).sum() / len(sample) chunksize = max(100, int(self.chunker.chunk_size_bytes / memory_per_row)) del sample force_garbage_collection() logger.info(f"Streaming CSV with {chunksize} rows per chunk") chunk_reader = pd.read_csv(filepath, chunksize=chunksize, **read_kwargs) for i, chunk in enumerate(chunk_reader): with memory_context(f"csv_chunk_{i}"): # Optimize chunk if needed if self.chunker.optimize_chunks: chunk = optimize_dataframe(chunk) result = processor(chunk) yield result # Clean up del chunk if i % 5 == 0: force_garbage_collection() def stream_from_database( self, query: str, connection, processor: Callable[[pd.DataFrame], Any], chunksize: int = None, ) -> Generator[Any, None, None]: """Stream process database query results in chunks. Args: query: SQL query connection: Database connection processor: Function to process each chunk chunksize: Rows per chunk Yields: Processed results for each chunk """ if chunksize is None: chunksize = 10000 # Default for database queries logger.info(f"Streaming database query with {chunksize} rows per chunk") chunk_reader = pd.read_sql(query, connection, chunksize=chunksize) for i, chunk in enumerate(chunk_reader): with memory_context(f"db_chunk_{i}"): if self.chunker.optimize_chunks: chunk = optimize_dataframe(chunk) result = processor(chunk) yield result del chunk if i % 3 == 0: force_garbage_collection() def optimize_dataframe_dtypes( df: pd.DataFrame, aggressive: bool = False, categorical_threshold: float = 0.5 ) -> pd.DataFrame: """Optimize DataFrame data types for memory efficiency. Args: df: DataFrame to optimize aggressive: Use aggressive optimizations (may lose precision) categorical_threshold: Threshold for categorical conversion Returns: Optimized DataFrame """ logger.debug(f"Optimizing DataFrame dtypes: {df.shape}") initial_memory = df.memory_usage(deep=True).sum() df_opt = df.copy() for col in df_opt.columns: col_type = df_opt[col].dtype try: if col_type == "object": # Convert string columns to categorical if beneficial unique_count = df_opt[col].nunique() total_count = len(df_opt[col]) if unique_count / total_count < categorical_threshold: df_opt[col] = df_opt[col].astype("category") logger.debug(f"Converted {col} to categorical") elif "int" in str(col_type): # Downcast integers c_min = df_opt[col].min() c_max = df_opt[col].max() if c_min >= np.iinfo(np.int8).min and c_max <= np.iinfo(np.int8).max: df_opt[col] = df_opt[col].astype(np.int8) elif ( c_min >= np.iinfo(np.int16).min and c_max <= np.iinfo(np.int16).max ): df_opt[col] = df_opt[col].astype(np.int16) elif ( c_min >= np.iinfo(np.int32).min and c_max <= np.iinfo(np.int32).max ): df_opt[col] = df_opt[col].astype(np.int32) elif "float" in str(col_type) and col_type == "float64": # Downcast float64 to float32 if no precision loss if aggressive: # Check if conversion preserves data temp = df_opt[col].astype(np.float32) if np.allclose( df_opt[col].fillna(0), temp.fillna(0), rtol=1e-6, equal_nan=True ): df_opt[col] = temp logger.debug(f"Converted {col} to float32") except Exception as e: logger.debug(f"Could not optimize column {col}: {e}") continue final_memory = df_opt.memory_usage(deep=True).sum() memory_saved = initial_memory - final_memory if memory_saved > 0: logger.info( f"DataFrame optimization saved {memory_saved / (1024**2):.2f}MB " f"({memory_saved / initial_memory * 100:.1f}% reduction)" ) return df_opt def create_memory_efficient_dataframe( data: dict | list, optimize: bool = True, categorical_columns: list[str] = None ) -> pd.DataFrame: """Create a memory-efficient DataFrame from data. Args: data: Data to create DataFrame from optimize: Whether to optimize dtypes categorical_columns: Columns to convert to categorical Returns: Memory-optimized DataFrame """ with memory_context("creating_dataframe"): df = pd.DataFrame(data) if categorical_columns: for col in categorical_columns: if col in df.columns: df[col] = df[col].astype("category") if optimize: df = optimize_dataframe_dtypes(df) return df def batch_process_large_dataframe( df: pd.DataFrame, operation: Callable, batch_size: int = None, combine_results: bool = True, ) -> Any: """Process large DataFrame in batches to manage memory. Args: df: Large DataFrame to process operation: Function to apply to each batch batch_size: Size of each batch (auto-estimated if None) combine_results: Whether to combine batch results Returns: Combined results or list of batch results """ chunker = DataChunker() if batch_size: chunk_generator = chunker.chunk_by_rows(df, batch_size) else: chunk_generator = chunker.chunk_by_memory(df) results = [] with memory_context("batch_processing"): for i, batch in enumerate(chunk_generator): logger.debug(f"Processing batch {i + 1}") with memory_context(f"batch_{i}"): result = operation(batch) results.append(result) if combine_results and results: if isinstance(results[0], pd.DataFrame): return pd.concat(results, ignore_index=True) elif isinstance(results[0], int | float): return sum(results) elif isinstance(results[0], list): return [item for sublist in results for item in sublist] return results class LazyDataFrame: """Lazy evaluation wrapper for large DataFrames.""" def __init__(self, data_source: str | pd.DataFrame, chunk_size_mb: float = 50.0): """Initialize lazy DataFrame. Args: data_source: File path or DataFrame chunk_size_mb: Chunk size for processing """ self.data_source = data_source self.chunker = DataChunker(chunk_size_mb=chunk_size_mb) self._cached_info = None def get_info(self) -> dict[str, Any]: """Get DataFrame information without loading full data.""" if self._cached_info: return self._cached_info if isinstance(self.data_source, str): # Read just the header and a sample sample = pd.read_csv(self.data_source, nrows=100) total_rows = sum(1 for _ in open(self.data_source)) - 1 # Subtract header self._cached_info = { "columns": sample.columns.tolist(), "dtypes": sample.dtypes.to_dict(), "estimated_rows": total_rows, "sample_memory_mb": sample.memory_usage(deep=True).sum() / (1024**2), } else: self._cached_info = get_dataframe_memory_usage(self.data_source) return self._cached_info def apply_chunked(self, operation: Callable) -> Any: """Apply operation in chunks.""" if isinstance(self.data_source, str): processor = StreamingDataProcessor(self.chunker.chunk_size_mb) results = list(processor.stream_from_csv(self.data_source, operation)) else: results = self.chunker.process_in_chunks(self.data_source, operation) return results def to_optimized_dataframe(self) -> pd.DataFrame: """Load and optimize the full DataFrame.""" if isinstance(self.data_source, str): df = pd.read_csv(self.data_source) else: df = self.data_source.copy() return optimize_dataframe_dtypes(df) # Utility functions for common operations def chunked_concat( dataframes: list[pd.DataFrame], chunk_size: int = 10 ) -> pd.DataFrame: """Concatenate DataFrames in chunks to manage memory. Args: dataframes: List of DataFrames to concatenate chunk_size: Number of DataFrames to concat at once Returns: Concatenated DataFrame """ if not dataframes: return pd.DataFrame() if len(dataframes) <= chunk_size: return pd.concat(dataframes, ignore_index=True) # Process in chunks results = [] for i in range(0, len(dataframes), chunk_size): chunk = dataframes[i : i + chunk_size] with memory_context(f"concat_chunk_{i // chunk_size}"): result = pd.concat(chunk, ignore_index=True) results.append(result) # Clean up chunk for df in chunk: del df force_garbage_collection() # Final concatenation with memory_context("final_concat"): final_result = pd.concat(results, ignore_index=True) return final_result def memory_efficient_groupby( df: pd.DataFrame, group_col: str, agg_func: Callable, chunk_size_mb: float = 50.0 ) -> pd.DataFrame: """Perform memory-efficient groupby operations. Args: df: DataFrame to group group_col: Column to group by agg_func: Aggregation function chunk_size_mb: Chunk size in MB Returns: Aggregated DataFrame """ if group_col not in df.columns: raise ValueError(f"Group column '{group_col}' not found") chunker = DataChunker(chunk_size_mb=chunk_size_mb) results = [] def process_chunk(chunk): return chunk.groupby(group_col).apply(agg_func).reset_index() results = chunker.process_in_chunks(df, process_chunk) # Combine and re-aggregate results combined = pd.concat(results, ignore_index=True) final_result = combined.groupby(group_col).apply(agg_func).reset_index() return final_result

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/wshobson/maverick-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server