group_aggregate
Group tabular data by specified columns and calculate aggregations like sum, mean, count, or standard deviation to analyze patterns and summarize information.
Instructions
Group data and compute aggregations.
Args:
file_path: Path to CSV or SQLite file
group_by: Columns to group by
aggregations: Dictionary mapping column names to list of aggregation functions
(e.g., {"sales": ["sum", "mean"], "quantity": ["count", "max"]})
Supported: sum, mean, median, min, max, count, std, var
Returns:
Dictionary containing grouped and aggregated data
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| file_path | Yes | ||
| group_by | Yes | ||
| aggregations | Yes |
Implementation Reference
- src/mcp_tabular/server.py:489-533 (handler)The handler function for the group_aggregate tool. It is decorated with @mcp.tool() for registration in the FastMCP server. Loads the dataset using _load_data, validates inputs, performs pandas groupby with specified aggregations (sum, mean, etc.), flattens multi-level columns, and returns summary and results.@mcp.tool() def group_aggregate( file_path: str, group_by: list[str], aggregations: dict[str, list[str]], ) -> dict[str, Any]: """ Group data and compute aggregations. Args: file_path: Path to CSV or SQLite file group_by: Columns to group by aggregations: Dictionary mapping column names to list of aggregation functions (e.g., {"sales": ["sum", "mean"], "quantity": ["count", "max"]}) Supported: sum, mean, median, min, max, count, std, var Returns: Dictionary containing grouped and aggregated data """ df = _load_data(file_path) # Validate group_by columns invalid = [c for c in group_by if c not in df.columns] if invalid: raise ValueError(f"Group-by columns not found: {invalid}") # Validate aggregation columns for col in aggregations: if col not in df.columns: raise ValueError(f"Aggregation column '{col}' not found") # Perform groupby grouped = df.groupby(group_by).agg(aggregations) # Flatten column names grouped.columns = ["_".join(col).strip() for col in grouped.columns] grouped = grouped.reset_index() return { "group_by": group_by, "aggregations": aggregations, "group_count": len(grouped), "result": grouped.to_dict(orient="records"), }