group_by_aggregate
Group data and compute aggregations like sum, mean, and count to analyze patterns across segments for insights and reporting.
Instructions
Group data and compute aggregations for analytical insights.
Performs GROUP BY operations with multiple aggregation functions per column. Essential for segmentation analysis and understanding patterns across different data groups.
Returns: Grouped aggregation results with statistics per group
Aggregation Functions: š count, mean, median, sum, min, max š std, var (statistical measures) šÆ first, last (positional) š nunique (unique count)
Examples: # Sales analysis by region result = await group_by_aggregate(ctx, group_by=["region"], aggregations={"sales": ["sum", "mean", "count"]})
AI Workflow Integration: 1. Segmentation analysis and market research 2. Feature engineering for categorical interactions 3. Data summarization for reporting and insights 4. Understanding group-based patterns and trends
Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
| group_by | Yes | List of columns to group by for segmentation analysis | |
| aggregations | Yes | Dict mapping column names to list of aggregation functions |
Implementation Reference
- The main handler function for the 'group_by_aggregate' tool. It retrieves the session dataframe, validates columns, performs groupby operation, computes statistics on numeric columns for each group, and returns a GroupAggregateResult.async def group_by_aggregate( ctx: Annotated[Context, Field(description="FastMCP context for session access")], group_by: Annotated[ list[str], Field(description="List of columns to group by for segmentation analysis"), ], aggregations: Annotated[ dict[str, list[str]], Field(description="Dict mapping column names to list of aggregation functions"), ], ) -> GroupAggregateResult: """Group data and compute aggregations for analytical insights. Performs GROUP BY operations with multiple aggregation functions per column. Essential for segmentation analysis and understanding patterns across different data groups. Returns: Grouped aggregation results with statistics per group Aggregation Functions: š count, mean, median, sum, min, max š std, var (statistical measures) šÆ first, last (positional) š nunique (unique count) Examples: # Sales analysis by region result = await group_by_aggregate(ctx, group_by=["region"], aggregations={"sales": ["sum", "mean", "count"]}) # Multi-dimensional grouping result = await group_by_aggregate(ctx, group_by=["category", "region"], aggregations={ "price": ["mean", "std"], "quantity": ["sum", "count"] }) AI Workflow Integration: 1. Segmentation analysis and market research 2. Feature engineering for categorical interactions 3. Data summarization for reporting and insights 4. Understanding group-based patterns and trends """ # Get session_id from FastMCP context session_id = ctx.session_id _session, df = get_session_data(session_id) # Validate group by columns missing_cols = [col for col in group_by if col not in df.columns] if missing_cols: raise ColumnNotFoundError(missing_cols[0], df.columns.tolist()) # Validate aggregation columns agg_cols = list(aggregations.keys()) missing_agg_cols = [col for col in agg_cols if col not in df.columns] if missing_agg_cols: raise ColumnNotFoundError(missing_agg_cols[0], df.columns.tolist()) # Perform groupby to get group statistics grouped = df.groupby(group_by) # Create GroupStatistics for each group group_stats = {} for group_name, group_data in grouped: # Convert group name to string for dict key if isinstance(group_name, tuple): group_key = "_".join(str(x) for x in group_name) else: group_key = str(group_name) # Calculate basic statistics for the group # Focus on first numeric column for statistics, or count for non-numeric numeric_cols = group_data.select_dtypes(include=[np.number]).columns if len(numeric_cols) > 0: # Use first numeric column for statistics first_numeric = group_data[numeric_cols[0]] group_stats[group_key] = GroupStatistics( count=len(group_data), mean=float(first_numeric.mean()) if not pd.isna(first_numeric.mean()) else None, sum=float(first_numeric.sum()) if not pd.isna(first_numeric.sum()) else None, min=float(first_numeric.min()) if not pd.isna(first_numeric.min()) else None, max=float(first_numeric.max()) if not pd.isna(first_numeric.max()) else None, std=float(first_numeric.std()) if not pd.isna(first_numeric.std()) else None, ) else: # No numeric columns, just provide count group_stats[group_key] = GroupStatistics(count=len(group_data)) return GroupAggregateResult( groups=group_stats, group_by_columns=group_by, aggregated_columns=agg_cols, total_groups=len(group_stats), )
- Pydantic model defining the output schema for the group_by_aggregate tool response.class GroupAggregateResult(BaseToolResponse): """Response model for group aggregation operations.""" groups: dict[str, GroupStatistics] group_by_columns: list[str] aggregated_columns: list[str] total_groups: int
- Pydantic model used within GroupAggregateResult for per-group statistics.class GroupStatistics(BaseModel): """Statistics for a grouped data segment.""" count: int = Field(description="Number of records in this group") mean: float | None = Field(default=None, description="Mean value for numeric columns") sum: float | None = Field(default=None, description="Sum of values for numeric columns") min: float | None = Field(default=None, description="Minimum value in the group") max: float | None = Field(default=None, description="Maximum value in the group") std: float | None = Field(default=None, description="Standard deviation for numeric columns")
- src/databeak/servers/discovery_server.py:853-853 (registration)Registration of the group_by_aggregate handler as an MCP tool on the discovery_server.discovery_server.tool(name="group_by_aggregate")(group_by_aggregate)