group_by_aggregate
Group CSV data by specific columns and apply aggregation functions for efficient data analysis and summarization within the CSV Editor MCP server.
Instructions
Group data and apply aggregation functions.
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| aggregations | Yes | ||
| group_by | Yes | ||
| session_id | Yes |
Implementation Reference
- Core handler function that implements the group_by_aggregate tool logic using pandas groupby and aggregation, validates inputs, updates session, and returns grouped results.async def group_by_aggregate( session_id: str, group_by: List[str], aggregations: Dict[str, Union[str, List[str]]], ctx: Context = None ) -> Dict[str, Any]: """ Group data and apply aggregation functions. Args: session_id: Session identifier group_by: Columns to group by aggregations: Dict mapping column names to aggregation functions e.g., {"sales": ["sum", "mean"], "quantity": "sum"} ctx: FastMCP context Returns: Dict with grouped data """ try: manager = get_session_manager() session = manager.get_session(session_id) if not session or session.df is None: return {"success": False, "error": "Invalid session or no data loaded"} df = session.df # Validate group by columns missing_cols = [col for col in group_by if col not in df.columns] if missing_cols: return {"success": False, "error": f"Group by columns not found: {missing_cols}"} # Validate aggregation columns agg_cols = list(aggregations.keys()) missing_agg_cols = [col for col in agg_cols if col not in df.columns] if missing_agg_cols: return {"success": False, "error": f"Aggregation columns not found: {missing_agg_cols}"} # Prepare aggregation dict agg_dict = {} for col, funcs in aggregations.items(): if isinstance(funcs, str): agg_dict[col] = [funcs] else: agg_dict[col] = funcs # Perform groupby grouped = df.groupby(group_by).agg(agg_dict) # Flatten column names grouped.columns = ['_'.join(col).strip() if col[1] else col[0] for col in grouped.columns.values] # Reset index to make group columns regular columns result_df = grouped.reset_index() # Convert to dict for response result = { "data": result_df.to_dict(orient='records'), "shape": { "rows": len(result_df), "columns": len(result_df.columns) }, "columns": result_df.columns.tolist() } # Store grouped data in session session.df = result_df session.record_operation(OperationType.GROUP_BY, { "group_by": group_by, "aggregations": aggregations, "result_shape": result["shape"] }) return { "success": True, "grouped_data": result, "group_by": group_by, "aggregations": aggregations } except Exception as e: logger.error(f"Error in group by aggregate: {str(e)}") return {"success": False, "error": str(e)}
- src/csv_editor/server.py:347-355 (registration)MCP tool registration decorator and wrapper function that exposes the group_by_aggregate handler to the MCP server.@mcp.tool async def group_by_aggregate( session_id: str, group_by: List[str], aggregations: Dict[str, Any], ctx: Context = None ) -> Dict[str, Any]: """Group data and apply aggregation functions.""" return await _group_by_aggregate(session_id, group_by, aggregations, ctx)
- src/csv_editor/server.py:307-315 (registration)Import statement that brings in the group_by_aggregate implementation from analytics module for use in server registration.from .tools.analytics import ( get_statistics as _get_statistics, get_column_statistics as _get_column_statistics, get_correlation_matrix as _get_correlation_matrix, group_by_aggregate as _group_by_aggregate, get_value_counts as _get_value_counts, detect_outliers as _detect_outliers, profile_data as _profile_data )