Skip to main content
Glama

group_by_aggregate

Group CSV data by specific columns and apply aggregation functions for efficient data analysis and summarization within the CSV Editor MCP server.

Instructions

Group data and apply aggregation functions.

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
aggregationsYes
group_byYes
session_idYes

Implementation Reference

  • Core handler function that implements the group_by_aggregate tool logic using pandas groupby and aggregation, validates inputs, updates session, and returns grouped results.
    async def group_by_aggregate( session_id: str, group_by: List[str], aggregations: Dict[str, Union[str, List[str]]], ctx: Context = None ) -> Dict[str, Any]: """ Group data and apply aggregation functions. Args: session_id: Session identifier group_by: Columns to group by aggregations: Dict mapping column names to aggregation functions e.g., {"sales": ["sum", "mean"], "quantity": "sum"} ctx: FastMCP context Returns: Dict with grouped data """ try: manager = get_session_manager() session = manager.get_session(session_id) if not session or session.df is None: return {"success": False, "error": "Invalid session or no data loaded"} df = session.df # Validate group by columns missing_cols = [col for col in group_by if col not in df.columns] if missing_cols: return {"success": False, "error": f"Group by columns not found: {missing_cols}"} # Validate aggregation columns agg_cols = list(aggregations.keys()) missing_agg_cols = [col for col in agg_cols if col not in df.columns] if missing_agg_cols: return {"success": False, "error": f"Aggregation columns not found: {missing_agg_cols}"} # Prepare aggregation dict agg_dict = {} for col, funcs in aggregations.items(): if isinstance(funcs, str): agg_dict[col] = [funcs] else: agg_dict[col] = funcs # Perform groupby grouped = df.groupby(group_by).agg(agg_dict) # Flatten column names grouped.columns = ['_'.join(col).strip() if col[1] else col[0] for col in grouped.columns.values] # Reset index to make group columns regular columns result_df = grouped.reset_index() # Convert to dict for response result = { "data": result_df.to_dict(orient='records'), "shape": { "rows": len(result_df), "columns": len(result_df.columns) }, "columns": result_df.columns.tolist() } # Store grouped data in session session.df = result_df session.record_operation(OperationType.GROUP_BY, { "group_by": group_by, "aggregations": aggregations, "result_shape": result["shape"] }) return { "success": True, "grouped_data": result, "group_by": group_by, "aggregations": aggregations } except Exception as e: logger.error(f"Error in group by aggregate: {str(e)}") return {"success": False, "error": str(e)}
  • MCP tool registration decorator and wrapper function that exposes the group_by_aggregate handler to the MCP server.
    @mcp.tool async def group_by_aggregate( session_id: str, group_by: List[str], aggregations: Dict[str, Any], ctx: Context = None ) -> Dict[str, Any]: """Group data and apply aggregation functions.""" return await _group_by_aggregate(session_id, group_by, aggregations, ctx)
  • Import statement that brings in the group_by_aggregate implementation from analytics module for use in server registration.
    from .tools.analytics import ( get_statistics as _get_statistics, get_column_statistics as _get_column_statistics, get_correlation_matrix as _get_correlation_matrix, group_by_aggregate as _group_by_aggregate, get_value_counts as _get_value_counts, detect_outliers as _detect_outliers, profile_data as _profile_data )

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/santoshray02/csv-editor'

If you have feedback or need assistance with the MCP directory API, please join our Discord server