Skip to main content
Glama

create_pivot_table

Destructive

Creates a pivot table from a specified data range, summarizing data with rows, values, aggregation function, and optional columns.

Instructions

Create pivot table in worksheet.

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
filepathYes
sheet_nameYes
data_rangeYes
rowsYes
valuesYes
columnsNo
agg_funcNomean

Output Schema

TableJSON Schema
NameRequiredDescriptionDefault
resultYes

Implementation Reference

  • The core implementation of the create_pivot_table tool. Handles loading Excel data, validating fields, aggregating values using the specified function, creating a new pivot sheet, writing headers/data, and formatting as an Excel table.
    def create_pivot_table(
        filepath: str,
        sheet_name: str,
        data_range: str,
        rows: list[str],
        values: list[str],
        columns: list[str] | None = None,
        agg_func: str = "sum"
    ) -> dict[str, Any]:
        """Create pivot table in sheet using Excel table functionality
        
        Args:
            filepath: Path to Excel file
            sheet_name: Name of worksheet containing source data
            data_range: Source data range reference
            target_cell: Cell reference for pivot table position
            rows: Fields for row labels
            values: Fields for values
            columns: Optional fields for column labels
            agg_func: Aggregation function (sum, count, average, max, min)
            
        Returns:
            Dictionary with status message and pivot table dimensions
        """
        try:
            wb = load_workbook(filepath)
            if sheet_name not in wb.sheetnames:
                raise ValidationError(f"Sheet '{sheet_name}' not found")
            
            # Parse ranges
            if ':' not in data_range:
                raise ValidationError("Data range must be in format 'A1:B2'")
                
            try:
                start_cell, end_cell = data_range.split(':')
                start_row, start_col, end_row, end_col = parse_cell_range(start_cell, end_cell)
            except ValueError as e:
                raise ValidationError(f"Invalid data range format: {str(e)}")
                
            if end_row is None or end_col is None:
                raise ValidationError("Invalid data range format: missing end coordinates")
                
            # Create range string
            data_range_str = f"{get_column_letter(start_col)}{start_row}:{get_column_letter(end_col)}{end_row}"
            
            # Clean up field names by removing aggregation suffixes
            def clean_field_name(field: str) -> str:
                field = str(field).strip()
                for suffix in [" (sum)", " (average)", " (count)", " (min)", " (max)"]:
                    if field.lower().endswith(suffix):
                        return field[:-len(suffix)]
                return field
    
            # Read source data and convert to list of dicts
            try:
                data_as_list = read_excel_range(filepath, sheet_name, start_cell, end_cell)
                if not data_as_list or len(data_as_list) < 2:
                    raise PivotError("Source data must have a header row and at least one data row.")
                
                headers = [str(h) for h in data_as_list[0]]
                data = [dict(zip(headers, row)) for row in data_as_list[1:]]
    
                if not data:
                    raise PivotError("No data rows found after header.")
    
            except Exception as e:
                raise PivotError(f"Failed to read or process source data: {str(e)}")
    
            # Validate aggregation function
            valid_agg_funcs = ["sum", "average", "count", "min", "max"]
            if agg_func.lower() not in valid_agg_funcs:
                raise ValidationError(
                    f"Invalid aggregation function. Must be one of: {', '.join(valid_agg_funcs)}"
                )
    
            # Validate field names exist in data
            if data:
                available_fields_raw = data[0].keys()
                available_fields = {clean_field_name(str(header)).lower() for header in available_fields_raw}
                
                for field_list, field_type in [(rows, "row"), (values, "value")]:
                    for field in field_list:
                        if clean_field_name(str(field)).lower() not in available_fields:
                            raise ValidationError(
                                f"Invalid {field_type} field '{field}'. "
                                f"Available fields: {', '.join(sorted(available_fields_raw))}"
                            )
    
                if columns:
                    for field in columns:
                        if clean_field_name(str(field)).lower() not in available_fields:
                            raise ValidationError(
                                f"Invalid column field '{field}'. "
                                f"Available fields: {', '.join(sorted(available_fields_raw))}"
                            )
    
            # Clean up row and value field names
            cleaned_rows = [clean_field_name(field) for field in rows]
            cleaned_values = [clean_field_name(field) for field in values]
    
            # Create pivot sheet
            pivot_sheet_name = f"{sheet_name}_pivot"
            if pivot_sheet_name in wb.sheetnames:
                wb.remove(wb[pivot_sheet_name])
            pivot_ws = wb.create_sheet(pivot_sheet_name)
    
            # Write headers
            current_row = 1
            current_col = 1
            
            # Write row field headers
            for field in cleaned_rows:
                cell = pivot_ws.cell(row=current_row, column=current_col, value=field)
                cell.font = Font(bold=True)
                current_col += 1
                
            # Write value field headers
            for field in cleaned_values:
                cell = pivot_ws.cell(row=current_row, column=current_col, value=f"{field} ({agg_func})")
                cell.font = Font(bold=True)
                current_col += 1
    
            # Get unique values for each row field
            field_values = {}
            for field in cleaned_rows:
                all_values = []
                for record in data:
                    value = str(record.get(field, ''))
                    all_values.append(value)
                field_values[field] = sorted(set(all_values))
    
            # Generate all combinations of row field values
            row_combinations = _get_combinations(field_values)
    
            # Calculate table dimensions for formatting
            total_rows = len(row_combinations) + 1  # +1 for header
            total_cols = len(cleaned_rows) + len(cleaned_values)
            
            # Write data rows
            current_row = 2
            for combo in row_combinations:
                # Write row field values
                col = 1
                for field in cleaned_rows:
                    pivot_ws.cell(row=current_row, column=col, value=combo[field])
                    col += 1
                
                # Filter data for current combination
                filtered_data = _filter_data(data, combo, {})
                
                # Calculate and write aggregated values
                for value_field in cleaned_values:
                    try:
                        value = _aggregate_values(filtered_data, value_field, agg_func)
                        pivot_ws.cell(row=current_row, column=col, value=value)
                    except Exception as e:
                        raise PivotError(f"Failed to aggregate values for field '{value_field}': {str(e)}")
                    col += 1
                    
                current_row += 1
    
            # Create a table for the pivot data
            try:
                pivot_range = f"A1:{get_column_letter(total_cols)}{total_rows}"
                pivot_table = Table(
                    displayName=f"PivotTable_{uuid.uuid4().hex[:8]}", 
                    ref=pivot_range
                )
                style = TableStyleInfo(
                    name="TableStyleMedium9",
                    showFirstColumn=False,
                    showLastColumn=False,
                    showRowStripes=True,
                    showColumnStripes=True
                )
                pivot_table.tableStyleInfo = style
                pivot_ws.add_table(pivot_table)
            except Exception as e:
                raise PivotError(f"Failed to create pivot table formatting: {str(e)}")
    
            try:
                wb.save(filepath)
            except Exception as e:
                raise PivotError(f"Failed to save workbook: {str(e)}")
            
            return {
                "message": "Summary table created successfully",
                "details": {
                    "source_range": data_range_str,
                    "pivot_sheet": pivot_sheet_name,
                    "rows": cleaned_rows,
                    "columns": columns or [],
                    "values": cleaned_values,
                    "aggregation": agg_func
                }
            }
            
        except (ValidationError, PivotError) as e:
            logger.error(str(e))
            raise
        except Exception as e:
            logger.error(f"Failed to create pivot table: {e}")
            raise PivotError(str(e))
  • MCP tool registration for create_pivot_table. The @mcp.tool decorator registers it with FastMCP under title 'Create Pivot Table'. It delegates to the implementation imported from excel_mcp.pivot.
    @mcp.tool(
        annotations=ToolAnnotations(
            title="Create Pivot Table",
            destructiveHint=True,
        ),
    )
    def create_pivot_table(
        filepath: str,
        sheet_name: str,
        data_range: str,
        rows: List[str],
        values: List[str],
        columns: Optional[List[str]] = None,
        agg_func: str = "mean"
    ) -> str:
        """Create pivot table in worksheet."""
        try:
            full_path = get_excel_path(filepath)
            result = create_pivot_table_impl(
                filepath=full_path,
                sheet_name=sheet_name,
                data_range=data_range,
                rows=rows,
                values=values,
                columns=columns or [],
                agg_func=agg_func
            )
            return result["message"]
        except (ValidationError, PivotError) as e:
            return f"Error: {str(e)}"
        except Exception as e:
            logger.error(f"Error creating pivot table: {e}")
            raise
  • Helper that generates all Cartesian product combinations of row field values for the pivot table.
    def _get_combinations(field_values: dict[str, set]) -> list[dict]:
        """Get all combinations of field values."""
        result = [{}]
        for field, values in list(field_values.items()):  # Convert to list to avoid runtime changes
            new_result = []
            for combo in result:
                for value in sorted(values):  # Sort for consistent ordering
                    new_combo = combo.copy()
                    new_combo[field] = value
                    new_result.append(new_combo)
            result = new_result
        return result
  • Helper that filters data records based on row and column filter criteria for aggregation.
    def _filter_data(data: list[dict], row_filters: dict, col_filters: dict) -> list[dict]:
        """Filter data based on row and column filters."""
        result = []
        for record in data:
            matches = True
            for field, value in row_filters.items():
                if record.get(field) != value:
                    matches = False
                    break
            for field, value in col_filters.items():
                if record.get(field) != value:
                    matches = False
                    break
            if matches:
                result.append(record)
        return result
  • Helper that performs aggregation (sum, average, count, min, max) on a field's values.
    def _aggregate_values(data: list[dict], field: str, agg_func: str) -> float:
        """Aggregate values using the specified function."""
        values = [record[field] for record in data if field in record and isinstance(record[field], (int, float))]
        if not values:
            return 0
            
        if agg_func == "sum":
            return sum(values)
        elif agg_func == "average":
            return sum(values) / len(values)
        elif agg_func == "count":
            return len(values)
        elif agg_func == "min":
            return min(values)
        elif agg_func == "max":
            return max(values)
        else:
            return sum(values)  # Default to sum
Behavior2/5

Does the description disclose side effects, auth requirements, rate limits, or destructive behavior?

Annotations already indicate destructiveHint=true, so the description's basic 'Create' is consistent but adds no extra context about side effects, permission needs, or what happens to existing data.

Agents need to know what a tool does to the world before calling it. Descriptions should go beyond structured annotations to explain consequences.

Conciseness2/5

Is the description appropriately sized, front-loaded, and free of redundancy?

The description is too brief for a tool with 7 parameters; it is under-specified rather than concise, sacrificing essential information for brevity.

Shorter descriptions cost fewer tokens and are easier for agents to parse. Every sentence should earn its place.

Completeness1/5

Given the tool's complexity, does the description cover enough for an agent to succeed on first attempt?

Given the complexity (7 parameters, 0% parameter docs, no usage guidance) the description is woefully incomplete; an agent would struggle to invoke this tool correctly.

Complex tools with many parameters or behaviors need more documentation. Simple tools need less. This dimension scales expectations accordingly.

Parameters1/5

Does the description clarify parameter syntax, constraints, interactions, or defaults beyond what the schema provides?

Schema descriptions are completely missing (0% coverage), and the tool description provides no hints about what each parameter means or how they should be used together.

Input schemas describe structure but not intent. Descriptions should explain non-obvious parameter relationships and valid value ranges.

Purpose4/5

Does the description clearly state what the tool does and how it differs from similar tools?

Description clearly states the action (create) and resource (pivot table in worksheet), but does not differentiate from sibling tools like create_table or create_chart, which also create different objects.

Agents choose between tools based on descriptions. A clear purpose with a specific verb and resource helps agents select the right tool.

Usage Guidelines2/5

Does the description explain when to use this tool, when not to, or what alternatives exist?

No guidance is provided on when to use this tool, what prerequisites exist (e.g., data must be in a certain format), or when alternatives are more appropriate.

Agents often have multiple tools that could apply. Explicit usage guidance like "use X instead of Y when Z" prevents misuse.

Install Server

Other Tools

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/haris-musa/excel-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server