Skip to main content
Glama
santoshray02

CSV Editor

by santoshray02

group_by_aggregate

Group CSV data by specified columns and apply aggregation functions to analyze and summarize information from large datasets.

Instructions

Group data and apply aggregation functions.

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
session_idYes
group_byYes
aggregationsYes

Implementation Reference

  • Core handler function that performs the groupby aggregation logic using pandas groupby. Validates inputs, executes groupby.agg, flattens multiindex columns, stores result in session, records operation.
    async def group_by_aggregate(
        session_id: str,
        group_by: List[str],
        aggregations: Dict[str, Union[str, List[str]]],
        ctx: Context = None
    ) -> Dict[str, Any]:
        """
        Group data and apply aggregation functions.
        
        Args:
            session_id: Session identifier
            group_by: Columns to group by
            aggregations: Dict mapping column names to aggregation functions
                         e.g., {"sales": ["sum", "mean"], "quantity": "sum"}
            ctx: FastMCP context
            
        Returns:
            Dict with grouped data
        """
        try:
            manager = get_session_manager()
            session = manager.get_session(session_id)
            
            if not session or session.df is None:
                return {"success": False, "error": "Invalid session or no data loaded"}
            
            df = session.df
            
            # Validate group by columns
            missing_cols = [col for col in group_by if col not in df.columns]
            if missing_cols:
                return {"success": False, "error": f"Group by columns not found: {missing_cols}"}
            
            # Validate aggregation columns
            agg_cols = list(aggregations.keys())
            missing_agg_cols = [col for col in agg_cols if col not in df.columns]
            if missing_agg_cols:
                return {"success": False, "error": f"Aggregation columns not found: {missing_agg_cols}"}
            
            # Prepare aggregation dict
            agg_dict = {}
            for col, funcs in aggregations.items():
                if isinstance(funcs, str):
                    agg_dict[col] = [funcs]
                else:
                    agg_dict[col] = funcs
            
            # Perform groupby
            grouped = df.groupby(group_by).agg(agg_dict)
            
            # Flatten column names
            grouped.columns = ['_'.join(col).strip() if col[1] else col[0] 
                              for col in grouped.columns.values]
            
            # Reset index to make group columns regular columns
            result_df = grouped.reset_index()
            
            # Convert to dict for response
            result = {
                "data": result_df.to_dict(orient='records'),
                "shape": {
                    "rows": len(result_df),
                    "columns": len(result_df.columns)
                },
                "columns": result_df.columns.tolist()
            }
            
            # Store grouped data in session
            session.df = result_df
            session.record_operation(OperationType.GROUP_BY, {
                "group_by": group_by,
                "aggregations": aggregations,
                "result_shape": result["shape"]
            })
            
            return {
                "success": True,
                "grouped_data": result,
                "group_by": group_by,
                "aggregations": aggregations
            }
            
        except Exception as e:
            logger.error(f"Error in group by aggregate: {str(e)}")
            return {"success": False, "error": str(e)}
  • MCP tool registration with @mcp.tool decorator. Wrapper function that delegates to the core implementation in analytics.py.
    @mcp.tool
    async def group_by_aggregate(
        session_id: str,
        group_by: List[str],
        aggregations: Dict[str, Any],
        ctx: Context = None
    ) -> Dict[str, Any]:
        """Group data and apply aggregation functions."""
        return await _group_by_aggregate(session_id, group_by, aggregations, ctx)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/santoshray02/csv-editor'

If you have feedback or need assistance with the MCP directory API, please join our Discord server