group_by_aggregate
Group CSV data by specified columns and apply aggregation functions to analyze and summarize information from large datasets.
Instructions
Group data and apply aggregation functions.
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| session_id | Yes | ||
| group_by | Yes | ||
| aggregations | Yes |
Implementation Reference
- Core handler function that performs the groupby aggregation logic using pandas groupby. Validates inputs, executes groupby.agg, flattens multiindex columns, stores result in session, records operation.async def group_by_aggregate( session_id: str, group_by: List[str], aggregations: Dict[str, Union[str, List[str]]], ctx: Context = None ) -> Dict[str, Any]: """ Group data and apply aggregation functions. Args: session_id: Session identifier group_by: Columns to group by aggregations: Dict mapping column names to aggregation functions e.g., {"sales": ["sum", "mean"], "quantity": "sum"} ctx: FastMCP context Returns: Dict with grouped data """ try: manager = get_session_manager() session = manager.get_session(session_id) if not session or session.df is None: return {"success": False, "error": "Invalid session or no data loaded"} df = session.df # Validate group by columns missing_cols = [col for col in group_by if col not in df.columns] if missing_cols: return {"success": False, "error": f"Group by columns not found: {missing_cols}"} # Validate aggregation columns agg_cols = list(aggregations.keys()) missing_agg_cols = [col for col in agg_cols if col not in df.columns] if missing_agg_cols: return {"success": False, "error": f"Aggregation columns not found: {missing_agg_cols}"} # Prepare aggregation dict agg_dict = {} for col, funcs in aggregations.items(): if isinstance(funcs, str): agg_dict[col] = [funcs] else: agg_dict[col] = funcs # Perform groupby grouped = df.groupby(group_by).agg(agg_dict) # Flatten column names grouped.columns = ['_'.join(col).strip() if col[1] else col[0] for col in grouped.columns.values] # Reset index to make group columns regular columns result_df = grouped.reset_index() # Convert to dict for response result = { "data": result_df.to_dict(orient='records'), "shape": { "rows": len(result_df), "columns": len(result_df.columns) }, "columns": result_df.columns.tolist() } # Store grouped data in session session.df = result_df session.record_operation(OperationType.GROUP_BY, { "group_by": group_by, "aggregations": aggregations, "result_shape": result["shape"] }) return { "success": True, "grouped_data": result, "group_by": group_by, "aggregations": aggregations } except Exception as e: logger.error(f"Error in group by aggregate: {str(e)}") return {"success": False, "error": str(e)}
- src/csv_editor/server.py:347-355 (registration)MCP tool registration with @mcp.tool decorator. Wrapper function that delegates to the core implementation in analytics.py.@mcp.tool async def group_by_aggregate( session_id: str, group_by: List[str], aggregations: Dict[str, Any], ctx: Context = None ) -> Dict[str, Any]: """Group data and apply aggregation functions.""" return await _group_by_aggregate(session_id, group_by, aggregations, ctx)