group_data
Group and aggregate CSV data by specified columns using functions like sum, mean, count, min, max, or std to analyze and summarize datasets.
Instructions
Group and aggregate CSV data.
Args:
filename: Name of the CSV file
group_by: Column name or list of column names to group by
aggregations: Dictionary mapping column names to aggregation functions
(sum, mean, count, min, max, std, etc.)
Returns:
Dictionary with grouped and aggregated data
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| filename | Yes | ||
| group_by | Yes | ||
| aggregations | Yes |
Implementation Reference
- csv_mcp_server/server.py:230-252 (handler)MCP tool handler for 'group_data' that delegates to CSVManager.group_data, providing input schema via type hints and error handling.@mcp.tool() def group_data( filename: str, group_by: Union[str, List[str]], aggregations: Dict[str, str] ) -> Dict[str, Any]: """ Group and aggregate CSV data. Args: filename: Name of the CSV file group_by: Column name or list of column names to group by aggregations: Dictionary mapping column names to aggregation functions (sum, mean, count, min, max, std, etc.) Returns: Dictionary with grouped and aggregated data """ try: return csv_manager.group_data(filename, group_by, aggregations) except Exception as e: return {"success": False, "error": str(e)}
- Core implementation of group_data in CSVManager using pandas groupby and agg for grouping and aggregation logic.def group_data(self, filename: str, group_by: Union[str, List[str]], aggregations: Dict[str, str]) -> Dict[str, Any]: """Group and aggregate CSV data.""" filepath = self._get_file_path(filename) if not filepath.exists(): raise FileNotFoundError(f"CSV file '{filename}' not found") try: df = pd.read_csv(filepath) # Ensure group_by is a list if isinstance(group_by, str): group_by = [group_by] # Validate group_by columns exist for col in group_by: if col not in df.columns: raise ValueError(f"Group by column '{col}' not found in CSV") # Validate aggregation columns exist for col in aggregations.keys(): if col not in df.columns: raise ValueError(f"Aggregation column '{col}' not found in CSV") # Group and aggregate grouped = df.groupby(group_by).agg(aggregations).reset_index() return { "success": True, "filename": filename, "group_by": group_by, "aggregations": aggregations, "grouped_data": grouped.to_dict('records'), "group_count": len(grouped) } except Exception as e: logger.error(f"Failed to group data: {e}") raise