Skip to main content
Glama
hyunjae-labs

xlwings Excel MCP Server

create_pivot_table

Create pivot tables in Excel to summarize and analyze data by organizing values into rows, columns, and aggregated calculations.

Instructions

Create pivot table in worksheet.

Args:
    sheet_name: Name of worksheet containing source data
    data_range: Source data range (e.g., "A1:E100" or "Sheet2!A1:E100")
    rows: Field names for row labels
    values: Field names for values
    session_id: Session ID from open_workbook (preferred)
    filepath: Path to Excel file (legacy, deprecated)
    columns: Field names for column labels (optional)
    agg_func: Aggregation function (sum, count, average, max, min)
    target_sheet: Target sheet for pivot table (optional, auto-created if not exists)
    target_cell: Target cell for pivot table (optional, finds empty area if not provided)
    pivot_name: Custom name for pivot table (optional, auto-generated if not provided)
    
Note: Use session_id for better performance. filepath parameter is deprecated.

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
sheet_nameYes
data_rangeYes
rowsYes
valuesYes
session_idNo
filepathNo
columnsNo
agg_funcNomean
target_sheetNo
target_cellNo
pivot_nameNo

Implementation Reference

  • MCP tool registration for 'create_pivot_table'. Thin wrapper that handles session_id or legacy filepath, delegates to xlwings implementation functions create_pivot_table_xlw or create_pivot_table_xlw_with_wb.
    @mcp.tool()
    def create_pivot_table(
        sheet_name: str,
        data_range: str,
        rows: List[str],
        values: List[str],
        session_id: Optional[str] = None,
        filepath: Optional[str] = None,
        columns: Optional[List[str]] = None,
        agg_func: str = "mean",
        target_sheet: Optional[str] = None,
        target_cell: Optional[str] = None,
        pivot_name: Optional[str] = None
    ) -> str:
        """
        Create pivot table in worksheet.
        
        Args:
            sheet_name: Name of worksheet containing source data
            data_range: Source data range (e.g., "A1:E100" or "Sheet2!A1:E100")
            rows: Field names for row labels
            values: Field names for values
            session_id: Session ID from open_workbook (preferred)
            filepath: Path to Excel file (legacy, deprecated)
            columns: Field names for column labels (optional)
            agg_func: Aggregation function (sum, count, average, max, min)
            target_sheet: Target sheet for pivot table (optional, auto-created if not exists)
            target_cell: Target cell for pivot table (optional, finds empty area if not provided)
            pivot_name: Custom name for pivot table (optional, auto-generated if not provided)
            
        Note: Use session_id for better performance. filepath parameter is deprecated.
        """
        try:
            # Support both new (session_id) and old (filepath) API
            if session_id:
                # New API: use session
                session = SESSION_MANAGER.get_session(session_id)
                if not session:
                    return ERROR_TEMPLATES['SESSION_NOT_FOUND'].format(
                        session_id=session_id, 
                        ttl=10  # Default TTL is 10 minutes (600 seconds)
                    )
                
                with session.lock:
                    from xlwings_mcp.xlwings_impl.advanced_xlw import create_pivot_table_xlw_with_wb
                    result = create_pivot_table_xlw_with_wb(
                        session.workbook,
                        sheet_name=sheet_name,
                        data_range=data_range,
                        rows=rows,
                        values=values,
                        columns=columns,
                        agg_func=agg_func,
                        target_sheet=target_sheet,
                        target_cell=target_cell,
                        pivot_name=pivot_name
                    )
            elif filepath:
                # Legacy API: backwards compatibility
                logger.warning("Using deprecated filepath parameter. Please use session_id instead.")
                full_path = get_excel_path(filepath)
                from xlwings_mcp.xlwings_impl.advanced_xlw import create_pivot_table_xlw
                result = create_pivot_table_xlw(
                    filepath=full_path,
                    sheet_name=sheet_name,
                    data_range=data_range,
                    rows=rows,
                    values=values,
                    columns=columns,
                    agg_func=agg_func,
                    target_sheet=target_sheet,
                    target_cell=target_cell,
                    pivot_name=pivot_name
                )
            else:
                return ERROR_TEMPLATES['PARAMETER_MISSING'].format(
                    param1='session_id',
                    param2='filepath'
                )
            
            # Handle warnings in response
            if "warnings" in result and result["warnings"]:
                warning_msg = "; ".join(result["warnings"])
                return f"{result.get('message', 'Pivot table created')} (Warnings: {warning_msg})"
            
            return result.get("message", "Pivot table created successfully") if "error" not in result else f"Error: {result['error']}"
            
        except (ValidationError, PivotError) as e:
            return f"Error: {str(e)}"
        except Exception as e:
            logger.error(f"Error creating pivot table: {e}")
            raise
  • Primary handler implementing pivot table creation logic using xlwings COM API. Handles workbook opening/closing, pivot cache creation, field addition (rows, columns, values), aggregation, styling, and error handling.
    def create_pivot_table_xlw(
        filepath: str,
        sheet_name: str,
        data_range: str,
        rows: List[str],
        values: List[str],
        columns: Optional[List[str]] = None,
        agg_func: str = "sum",
        target_sheet: Optional[str] = None,
        target_cell: str = None,
        pivot_name: Optional[str] = None
    ) -> Dict[str, Any]:
        """
        Create a pivot table in Excel using xlwings.
        
        Args:
            filepath: Path to Excel file
            sheet_name: Name of worksheet containing source data
            data_range: Source data range (e.g., "A1:E100" or "Sheet2!A1:E100")
            rows: Field names for row labels
            values: Field names for values
            columns: Field names for column labels (optional)
            agg_func: Aggregation function (sum, count, average, max, min)
            target_sheet: Target sheet for pivot table (optional)
            target_cell: Target cell for pivot table (optional, default finds empty area)
            pivot_name: Custom name for pivot table (optional)
            
        Returns:
            Dict with success message or error
        """
        app = None
        wb = None
    
        # Initialize COM for thread safety (Windows)
        _com_initialize()
    
        try:
            logger.info(f"Creating pivot table in {sheet_name}")
            
            # Check if file exists
            if not os.path.exists(filepath):
                return {"error": f"File not found: {filepath}"}
            
            # Open Excel app and workbook
            app = xw.App(visible=False, add_book=False)
            wb = app.books.open(filepath)
            
            # Check if sheet exists
            sheet_names = [s.name for s in wb.sheets]
            if sheet_name not in sheet_names:
                return {"error": f"Sheet '{sheet_name}' not found"}
            
            # Parse data range to support cross-sheet references
            if "!" in data_range:
                # Format: "SheetName!A1:E100"
                source_sheet_name, range_part = data_range.split("!", 1)
                # Remove quotes if present
                source_sheet_name = source_sheet_name.strip("'\"")
                if source_sheet_name not in sheet_names:
                    return {"error": f"Source sheet '{source_sheet_name}' not found"}
                source_sheet = wb.sheets[source_sheet_name]
                source_range = source_sheet.range(range_part)
            else:
                # Use the provided sheet_name
                source_sheet = wb.sheets[sheet_name]
                source_range = source_sheet.range(data_range)
            
            # Determine target sheet for pivot table
            if target_sheet:
                # Use specified target sheet
                if target_sheet not in sheet_names:
                    # Create if doesn't exist
                    pivot_sheet = wb.sheets.add(target_sheet)
                else:
                    pivot_sheet = wb.sheets[target_sheet]
            else:
                # Auto-generate unique pivot sheet name
                pivot_sheet_name = "PivotTable"
                counter = 1
                while pivot_sheet_name in sheet_names:
                    pivot_sheet_name = f"PivotTable{counter}"
                    counter += 1
                pivot_sheet = wb.sheets.add(pivot_sheet_name)
            
            # Determine target cell position
            if not target_cell:
                # Find empty area automatically
                used_range = pivot_sheet.used_range
                if used_range:
                    # Place below existing content with some spacing
                    target_cell = f"A{used_range.last_cell.row + 3}"
                else:
                    target_cell = "A3"  # Default position if sheet is empty
            
            # Use COM API to create pivot table
            pivot_cache = wb.api.PivotCaches().Create(
                SourceType=1,  # xlDatabase
                SourceData=source_range.api
            )
            
            # Generate unique pivot table name if not provided
            if not pivot_name:
                existing_pivots = []
                try:
                    # Try to get existing pivot table names
                    for sheet in wb.sheets:
                        try:
                            sheet_pivots = sheet.api.PivotTables()
                            for i in range(1, sheet_pivots.Count + 1):
                                existing_pivots.append(sheet_pivots.Item(i).Name)
                        except:
                            pass
                except:
                    pass
                
                # Generate unique name
                pivot_name = "PivotTable1"
                counter = 1
                while pivot_name in existing_pivots:
                    counter += 1
                    pivot_name = f"PivotTable{counter}"
            
            pivot_table = pivot_cache.CreatePivotTable(
                TableDestination=pivot_sheet.range(target_cell).api,
                TableName=pivot_name
            )
            
            # Get field names from first row of data (use source_range which is already parsed)
            header_range = source_range.rows[0]
            field_names = [cell.value for cell in header_range]
            
            # Track warnings for partial failures
            warnings = []
            
            # Add row fields - try different COM API access methods
            for row_field in rows:
                if row_field in field_names:
                    success = False
                    try:
                        # Method 1: Direct string access
                        field = pivot_table.PivotFields(row_field)
                        field.Orientation = 1  # xlRowField
                        success = True
                    except:
                        try:
                            # Method 2: Index access
                            field_index = field_names.index(row_field) + 1
                            field = pivot_table.PivotFields(field_index)
                            field.Orientation = 1  # xlRowField
                            success = True
                        except Exception as e:
                            error_msg = f"Failed to add row field '{row_field}': {str(e)}"
                            logger.warning(error_msg)
                            warnings.append(error_msg)
                else:
                    warnings.append(f"Row field '{row_field}' not found in data headers")
            
            # Add column fields
            if columns:
                for col_field in columns:
                    if col_field in field_names:
                        success = False
                        try:
                            # Method 1: Direct string access
                            field = pivot_table.PivotFields(col_field)
                            field.Orientation = 2  # xlColumnField
                            success = True
                        except:
                            try:
                                # Method 2: Index access
                                field_index = field_names.index(col_field) + 1
                                field = pivot_table.PivotFields(field_index)
                                field.Orientation = 2  # xlColumnField
                                success = True
                            except Exception as e:
                                error_msg = f"Failed to add column field '{col_field}': {str(e)}"
                                logger.warning(error_msg)
                                warnings.append(error_msg)
                    else:
                        warnings.append(f"Column field '{col_field}' not found in data headers")
            
            # Add value fields with aggregation
            # Note: Aggregation function setting is simplified for stability
            # Users can change aggregation type in Excel after creation
            
            for value_field in values:
                if value_field in field_names:
                    success = False
                    try:
                        # Method 1: Direct string access
                        field = pivot_table.PivotFields(value_field)
                        field.Orientation = 4  # xlDataField
                        success = True
                        logger.info(f"Added value field '{value_field}' successfully")
                    except:
                        try:
                            # Method 2: Index access
                            field_index = field_names.index(value_field) + 1
                            field = pivot_table.PivotFields(field_index)
                            field.Orientation = 4  # xlDataField
                            success = True
                            logger.info(f"Added value field '{value_field}' using index")
                        except Exception as e:
                            error_msg = f"Failed to add value field '{value_field}': {str(e)}"
                            logger.warning(error_msg)
                            warnings.append(error_msg)
                    
                    # Try to set aggregation function if field was added successfully
                    # This is optional - if it fails, the default (usually Sum) will be used
                    if success and agg_func.lower() != 'sum':
                        try:
                            # Safer approach: iterate through DataFields to find our field
                            agg_map = {
                                'count': -4112,    # xlCount
                                'average': -4106,  # xlAverage
                                'mean': -4106,     # xlAverage (alias)
                                'max': -4136,      # xlMax
                                'min': -4139,      # xlMin
                            }
                            
                            if agg_func.lower() in agg_map:
                                # Wait a moment for COM to update
                                import time
                                time.sleep(0.1)
                                
                                # Try to find and update the data field
                                for i in range(1, pivot_table.DataFields.Count + 1):
                                    try:
                                        data_field = pivot_table.DataFields(i)
                                        # Check if this is our field (name contains the original field name)
                                        if value_field in str(data_field.SourceName):
                                            data_field.Function = agg_map[agg_func.lower()]
                                            logger.info(f"Set aggregation to {agg_func} for {value_field}")
                                            break
                                    except:
                                        continue
                        except Exception as e:
                            # Non-critical: aggregation function setting failed
                            logger.debug(f"Could not set aggregation function for {value_field}: {e}")
                            # Don't add to warnings - field was added successfully
                else:
                    warnings.append(f"Value field '{value_field}' not found in data headers")
            
            # Apply default pivot table style
            pivot_table.TableStyle2 = "PivotStyleMedium9"
            
            # Save the workbook
            wb.save()
            
            # Prepare result
            result = {
                "message": f"Successfully created pivot table '{pivot_name}'",
                "pivot_name": pivot_name,
                "pivot_sheet": pivot_sheet.name,
                "pivot_cell": target_cell,
                "source_range": data_range,
                "source_sheet": source_sheet.name,
                "rows": rows,
                "columns": columns or [],
                "values": values,
                "aggregation": agg_func
            }
            
            # Add warnings if any
            if warnings:
                result["warnings"] = warnings
                logger.info(f"⚠️ Pivot table created with warnings: {warnings}")
            else:
                logger.info(f"✅ Successfully created pivot table '{pivot_name}' at {pivot_sheet.name}!{target_cell}")
            
            return result
            
        except Exception as e:
            logger.error(f"❌ Error creating pivot table: {str(e)}")
            return {"error": str(e)}
            
        finally:
            if wb:
                wb.close()
            if app:
                app.quit()
  • Session-based variant of the pivot table handler, used when workbook is already open via session_id. Core logic same as primary handler but reuses existing workbook object.
    def create_pivot_table_xlw_with_wb(
        wb,
        sheet_name: str,
        data_range: str,
        rows: List[str],
        values: List[str],
        columns: Optional[List[str]] = None,
        agg_func: str = "sum",
        target_sheet: Optional[str] = None,
        target_cell: str = None,
        pivot_name: Optional[str] = None
    ) -> Dict[str, Any]:
        """Session-based version using existing workbook object.
        
        Args:
            wb: Workbook object from session
            sheet_name: Name of worksheet containing source data
            data_range: Source data range (e.g., "A1:E100" or "Sheet2!A1:E100")
            rows: Field names for row labels
            values: Field names for values
            columns: Field names for column labels (optional)
            agg_func: Aggregation function (sum, count, average, max, min)
            target_sheet: Target sheet for pivot table (optional)
            target_cell: Target cell for pivot table (optional, default finds empty area)
            pivot_name: Custom name for pivot table (optional)
            
        Returns:
            Dict with success message or error
        """
        try:
            logger.info(f"📊 Creating pivot table in {sheet_name}")
            
            # Check if sheet exists
            sheet_names = [s.name for s in wb.sheets]
            if sheet_name not in sheet_names:
                return {"error": f"Sheet '{sheet_name}' not found"}
            
            # Parse data range to support cross-sheet references
            if "!" in data_range:
                # Format: "SheetName!A1:E100"
                source_sheet_name, range_part = data_range.split("!", 1)
                # Remove quotes if present
                source_sheet_name = source_sheet_name.strip('\'"')
                if source_sheet_name not in sheet_names:
                    return {"error": f"Source sheet '{source_sheet_name}' not found"}
                source_sheet = wb.sheets[source_sheet_name]
                source_range = source_sheet.range(range_part)
            else:
                # Use the provided sheet_name
                source_sheet = wb.sheets[sheet_name]
                source_range = source_sheet.range(data_range)
            
            # Determine target sheet for pivot table
            if target_sheet:
                # Use specified target sheet
                if target_sheet not in sheet_names:
                    # Create if doesn't exist
                    pivot_sheet = wb.sheets.add(target_sheet)
                else:
                    pivot_sheet = wb.sheets[target_sheet]
            else:
                # Auto-generate unique pivot sheet name
                pivot_sheet_name = "PivotTable"
                counter = 1
                while pivot_sheet_name in sheet_names:
                    pivot_sheet_name = f"PivotTable{counter}"
                    counter += 1
                pivot_sheet = wb.sheets.add(pivot_sheet_name)
            
            # Determine target cell position
            if not target_cell:
                # Find empty area automatically
                used_range = pivot_sheet.used_range
                if used_range:
                    # Place below existing content with some spacing
                    target_cell = f"A{used_range.last_cell.row + 3}"
                else:
                    target_cell = "A3"  # Default position if sheet is empty
            
            # Use COM API to create pivot table
            pivot_cache = wb.api.PivotCaches().Create(
                SourceType=1,  # xlDatabase
                SourceData=source_range.api
            )
            
            # Generate unique pivot table name if not provided
            if not pivot_name:
                existing_pivots = []
                try:
                    # Try to get existing pivot table names
                    for sheet in wb.sheets:
                        try:
                            sheet_pivots = sheet.api.PivotTables()
                            for i in range(1, sheet_pivots.Count + 1):
                                existing_pivots.append(sheet_pivots.Item(i).Name)
                        except:
                            pass
                except:
                    pass
                
                # Generate unique name
                pivot_name = "PivotTable1"
                counter = 1
                while pivot_name in existing_pivots:
                    counter += 1
                    pivot_name = f"PivotTable{counter}"
            
            pivot_table = pivot_cache.CreatePivotTable(
                TableDestination=pivot_sheet.range(target_cell).api,
                TableName=pivot_name
            )
            
            # Get field names from first row of data (use source_range which is already parsed)
            header_range = source_range.rows[0]
            field_names = [cell.value for cell in header_range]
            
            # Track warnings for partial failures
            warnings = []
            
            # Add row fields - try different COM API access methods
            for row_field in rows:
                if row_field in field_names:
                    success = False
                    try:
                        # Method 1: Direct string access
                        field = pivot_table.PivotFields(row_field)
                        field.Orientation = 1  # xlRowField
                        success = True
                    except:
                        try:
                            # Method 2: Index access
                            field_index = field_names.index(row_field) + 1
                            field = pivot_table.PivotFields(field_index)
                            field.Orientation = 1  # xlRowField
                            success = True
                        except Exception as e:
                            error_msg = f"Failed to add row field '{row_field}': {str(e)}"
                            logger.warning(error_msg)
                            warnings.append(error_msg)
                else:
                    warnings.append(f"Row field '{row_field}' not found in data headers")
            
            # Add column fields
            if columns:
                for col_field in columns:
                    if col_field in field_names:
                        success = False
                        try:
                            # Method 1: Direct string access
                            field = pivot_table.PivotFields(col_field)
                            field.Orientation = 2  # xlColumnField
                            success = True
                        except:
                            try:
                                # Method 2: Index access
                                field_index = field_names.index(col_field) + 1
                                field = pivot_table.PivotFields(field_index)
                                field.Orientation = 2  # xlColumnField
                                success = True
                            except Exception as e:
                                error_msg = f"Failed to add column field '{col_field}': {str(e)}"
                                logger.warning(error_msg)
                                warnings.append(error_msg)
                    else:
                        warnings.append(f"Column field '{col_field}' not found in data headers")
            
            # Add value fields with aggregation
            for value_field in values:
                if value_field in field_names:
                    success = False
                    try:
                        # Method 1: Direct string access
                        field = pivot_table.PivotFields(value_field)
                        field.Orientation = 4  # xlDataField
                        success = True
                        logger.info(f"Added value field '{value_field}' successfully")
                    except:
                        try:
                            # Method 2: Index access
                            field_index = field_names.index(value_field) + 1
                            field = pivot_table.PivotFields(field_index)
                            field.Orientation = 4  # xlDataField
                            success = True
                            logger.info(f"Added value field '{value_field}' using index")
                        except Exception as e:
                            error_msg = f"Failed to add value field '{value_field}': {str(e)}"
                            logger.warning(error_msg)
                            warnings.append(error_msg)
                    
                    # Try to set aggregation function if field was added successfully
                    if success and agg_func.lower() != 'sum':
                        try:
                            # Safer approach: iterate through DataFields to find our field
                            agg_map = {
                                'count': -4112,    # xlCount
                                'average': -4106,  # xlAverage
                                'mean': -4106,     # xlAverage (alias)
                                'max': -4136,      # xlMax
                                'min': -4139,      # xlMin
                            }
                            
                            if agg_func.lower() in agg_map:
                                # Wait a moment for COM to update
                                import time
                                time.sleep(0.1)
                                
                                # Try to find and update the data field
                                for i in range(1, pivot_table.DataFields.Count + 1):
                                    try:
                                        data_field = pivot_table.DataFields(i)
                                        # Check if this is our field (name contains the original field name)
                                        if value_field in str(data_field.SourceName):
                                            data_field.Function = agg_map[agg_func.lower()]
                                            logger.info(f"Set aggregation to {agg_func} for {value_field}")
                                            break
                                    except:
                                        continue
                        except Exception as e:
                            # Non-critical: aggregation function setting failed
                            logger.debug(f"Could not set aggregation function for {value_field}: {e}")
                else:
                    warnings.append(f"Value field '{value_field}' not found in data headers")
            
            # Apply default pivot table style
            pivot_table.TableStyle2 = "PivotStyleMedium9"
            
            # Save the workbook
            wb.save()
            
            # Prepare result
            result = {
                "message": f"Successfully created pivot table '{pivot_name}'",
                "pivot_name": pivot_name,
                "pivot_sheet": pivot_sheet.name,
                "pivot_cell": target_cell,
                "source_range": data_range,
                "source_sheet": source_sheet.name,
                "rows": rows,
                "columns": columns or [],
                "values": values,
                "aggregation": agg_func
            }
            
            # Add warnings if any
            if warnings:
                result["warnings"] = warnings
                logger.info(f"⚠️ Pivot table created with warnings: {warnings}")
            else:
                logger.info(f"✅ Successfully created pivot table '{pivot_name}' at {pivot_sheet.name}!{target_cell}")
            
            return result
            
        except Exception as e:
            logger.error(f"❌ Error creating pivot table: {str(e)}")
            return {"error": str(e)}

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/hyunjae-labs/xlwings-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server