Skip to main content
Glama

group_by_aggregate

Group CSV data by specified columns and apply aggregation functions to analyze and summarize information from large datasets.

Instructions

Group data and apply aggregation functions.

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
session_idYes
group_byYes
aggregationsYes

Implementation Reference

  • Core handler function that performs the groupby aggregation logic using pandas groupby. Validates inputs, executes groupby.agg, flattens multiindex columns, stores result in session, records operation.
    async def group_by_aggregate( session_id: str, group_by: List[str], aggregations: Dict[str, Union[str, List[str]]], ctx: Context = None ) -> Dict[str, Any]: """ Group data and apply aggregation functions. Args: session_id: Session identifier group_by: Columns to group by aggregations: Dict mapping column names to aggregation functions e.g., {"sales": ["sum", "mean"], "quantity": "sum"} ctx: FastMCP context Returns: Dict with grouped data """ try: manager = get_session_manager() session = manager.get_session(session_id) if not session or session.df is None: return {"success": False, "error": "Invalid session or no data loaded"} df = session.df # Validate group by columns missing_cols = [col for col in group_by if col not in df.columns] if missing_cols: return {"success": False, "error": f"Group by columns not found: {missing_cols}"} # Validate aggregation columns agg_cols = list(aggregations.keys()) missing_agg_cols = [col for col in agg_cols if col not in df.columns] if missing_agg_cols: return {"success": False, "error": f"Aggregation columns not found: {missing_agg_cols}"} # Prepare aggregation dict agg_dict = {} for col, funcs in aggregations.items(): if isinstance(funcs, str): agg_dict[col] = [funcs] else: agg_dict[col] = funcs # Perform groupby grouped = df.groupby(group_by).agg(agg_dict) # Flatten column names grouped.columns = ['_'.join(col).strip() if col[1] else col[0] for col in grouped.columns.values] # Reset index to make group columns regular columns result_df = grouped.reset_index() # Convert to dict for response result = { "data": result_df.to_dict(orient='records'), "shape": { "rows": len(result_df), "columns": len(result_df.columns) }, "columns": result_df.columns.tolist() } # Store grouped data in session session.df = result_df session.record_operation(OperationType.GROUP_BY, { "group_by": group_by, "aggregations": aggregations, "result_shape": result["shape"] }) return { "success": True, "grouped_data": result, "group_by": group_by, "aggregations": aggregations } except Exception as e: logger.error(f"Error in group by aggregate: {str(e)}") return {"success": False, "error": str(e)}
  • MCP tool registration with @mcp.tool decorator. Wrapper function that delegates to the core implementation in analytics.py.
    @mcp.tool async def group_by_aggregate( session_id: str, group_by: List[str], aggregations: Dict[str, Any], ctx: Context = None ) -> Dict[str, Any]: """Group data and apply aggregation functions.""" return await _group_by_aggregate(session_id, group_by, aggregations, ctx)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/santoshray02/csv-editor'

If you have feedback or need assistance with the MCP directory API, please join our Discord server