create_pivot_table
Generate pivot tables in Excel by specifying source data, row/column labels, values, and aggregation functions. Automate data summarization and analysis for improved insights directly within your worksheets.
Instructions
Create pivot table in worksheet.
Args:
filepath: Path to Excel file
sheet_name: Name of worksheet containing source data
data_range: Source data range (e.g., "A1:E100" or "Sheet2!A1:E100")
rows: Field names for row labels
values: Field names for values
columns: Field names for column labels (optional)
agg_func: Aggregation function (sum, count, average, max, min)
target_sheet: Target sheet for pivot table (optional, auto-created if not exists)
target_cell: Target cell for pivot table (optional, finds empty area if not provided)
pivot_name: Custom name for pivot table (optional, auto-generated if not provided)
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| agg_func | No | mean | |
| columns | No | ||
| data_range | Yes | ||
| filepath | Yes | ||
| pivot_name | No | ||
| rows | Yes | ||
| sheet_name | Yes | ||
| target_cell | No | ||
| target_sheet | No | ||
| values | Yes |
Implementation Reference
- src/xlwings_mcp/server.py:658-750 (registration)MCP tool registration defining the 'create_pivot_table' tool with parameters and docstring schema. Delegates to session-based or legacy xlwings implementations.@mcp.tool() def create_pivot_table( sheet_name: str, data_range: str, rows: List[str], values: List[str], session_id: Optional[str] = None, filepath: Optional[str] = None, columns: Optional[List[str]] = None, agg_func: str = "mean", target_sheet: Optional[str] = None, target_cell: Optional[str] = None, pivot_name: Optional[str] = None ) -> str: """ Create pivot table in worksheet. Args: sheet_name: Name of worksheet containing source data data_range: Source data range (e.g., "A1:E100" or "Sheet2!A1:E100") rows: Field names for row labels values: Field names for values session_id: Session ID from open_workbook (preferred) filepath: Path to Excel file (legacy, deprecated) columns: Field names for column labels (optional) agg_func: Aggregation function (sum, count, average, max, min) target_sheet: Target sheet for pivot table (optional, auto-created if not exists) target_cell: Target cell for pivot table (optional, finds empty area if not provided) pivot_name: Custom name for pivot table (optional, auto-generated if not provided) Note: Use session_id for better performance. filepath parameter is deprecated. """ try: # Support both new (session_id) and old (filepath) API if session_id: # New API: use session session = SESSION_MANAGER.get_session(session_id) if not session: return ERROR_TEMPLATES['SESSION_NOT_FOUND'].format( session_id=session_id, ttl=10 # Default TTL is 10 minutes (600 seconds) ) with session.lock: from xlwings_mcp.xlwings_impl.advanced_xlw import create_pivot_table_xlw_with_wb result = create_pivot_table_xlw_with_wb( session.workbook, sheet_name=sheet_name, data_range=data_range, rows=rows, values=values, columns=columns, agg_func=agg_func, target_sheet=target_sheet, target_cell=target_cell, pivot_name=pivot_name ) elif filepath: # Legacy API: backwards compatibility logger.warning("Using deprecated filepath parameter. Please use session_id instead.") full_path = get_excel_path(filepath) from xlwings_mcp.xlwings_impl.advanced_xlw import create_pivot_table_xlw result = create_pivot_table_xlw( filepath=full_path, sheet_name=sheet_name, data_range=data_range, rows=rows, values=values, columns=columns, agg_func=agg_func, target_sheet=target_sheet, target_cell=target_cell, pivot_name=pivot_name ) else: return ERROR_TEMPLATES['PARAMETER_MISSING'].format( param1='session_id', param2='filepath' ) # Handle warnings in response if "warnings" in result and result["warnings"]: warning_msg = "; ".join(result["warnings"]) return f"{result.get('message', 'Pivot table created')} (Warnings: {warning_msg})" return result.get("message", "Pivot table created successfully") if "error" not in result else f"Error: {result['error']}" except (ValidationError, PivotError) as e: return f"Error: {str(e)}" except Exception as e: logger.error(f"Error creating pivot table: {e}") raise
- Primary handler implementation for session-based pivot table creation using existing workbook object. Handles pivot cache creation, field assignments, aggregation, and styling.def create_pivot_table_xlw_with_wb( wb, sheet_name: str, data_range: str, rows: List[str], values: List[str], columns: Optional[List[str]] = None, agg_func: str = "sum", target_sheet: Optional[str] = None, target_cell: str = None, pivot_name: Optional[str] = None ) -> Dict[str, Any]: """Session-based version using existing workbook object. Args: wb: Workbook object from session sheet_name: Name of worksheet containing source data data_range: Source data range (e.g., "A1:E100" or "Sheet2!A1:E100") rows: Field names for row labels values: Field names for values columns: Field names for column labels (optional) agg_func: Aggregation function (sum, count, average, max, min) target_sheet: Target sheet for pivot table (optional) target_cell: Target cell for pivot table (optional, default finds empty area) pivot_name: Custom name for pivot table (optional) Returns: Dict with success message or error """ try: logger.info(f"๐ Creating pivot table in {sheet_name}") # Check if sheet exists sheet_names = [s.name for s in wb.sheets] if sheet_name not in sheet_names: return {"error": f"Sheet '{sheet_name}' not found"} # Parse data range to support cross-sheet references if "!" in data_range: # Format: "SheetName!A1:E100" source_sheet_name, range_part = data_range.split("!", 1) # Remove quotes if present source_sheet_name = source_sheet_name.strip('\'"') if source_sheet_name not in sheet_names: return {"error": f"Source sheet '{source_sheet_name}' not found"} source_sheet = wb.sheets[source_sheet_name] source_range = source_sheet.range(range_part) else: # Use the provided sheet_name source_sheet = wb.sheets[sheet_name] source_range = source_sheet.range(data_range) # Determine target sheet for pivot table if target_sheet: # Use specified target sheet if target_sheet not in sheet_names: # Create if doesn't exist pivot_sheet = wb.sheets.add(target_sheet) else: pivot_sheet = wb.sheets[target_sheet] else: # Auto-generate unique pivot sheet name pivot_sheet_name = "PivotTable" counter = 1 while pivot_sheet_name in sheet_names: pivot_sheet_name = f"PivotTable{counter}" counter += 1 pivot_sheet = wb.sheets.add(pivot_sheet_name) # Determine target cell position if not target_cell: # Find empty area automatically used_range = pivot_sheet.used_range if used_range: # Place below existing content with some spacing target_cell = f"A{used_range.last_cell.row + 3}" else: target_cell = "A3" # Default position if sheet is empty # Use COM API to create pivot table pivot_cache = wb.api.PivotCaches().Create( SourceType=1, # xlDatabase SourceData=source_range.api ) # Generate unique pivot table name if not provided if not pivot_name: existing_pivots = [] try: # Try to get existing pivot table names for sheet in wb.sheets: try: sheet_pivots = sheet.api.PivotTables() for i in range(1, sheet_pivots.Count + 1): existing_pivots.append(sheet_pivots.Item(i).Name) except: pass except: pass # Generate unique name pivot_name = "PivotTable1" counter = 1 while pivot_name in existing_pivots: counter += 1 pivot_name = f"PivotTable{counter}" pivot_table = pivot_cache.CreatePivotTable( TableDestination=pivot_sheet.range(target_cell).api, TableName=pivot_name ) # Get field names from first row of data (use source_range which is already parsed) header_range = source_range.rows[0] field_names = [cell.value for cell in header_range] # Track warnings for partial failures warnings = [] # Add row fields - try different COM API access methods for row_field in rows: if row_field in field_names: success = False try: # Method 1: Direct string access field = pivot_table.PivotFields(row_field) field.Orientation = 1 # xlRowField success = True except: try: # Method 2: Index access field_index = field_names.index(row_field) + 1 field = pivot_table.PivotFields(field_index) field.Orientation = 1 # xlRowField success = True except Exception as e: error_msg = f"Failed to add row field '{row_field}': {str(e)}" logger.warning(error_msg) warnings.append(error_msg) else: warnings.append(f"Row field '{row_field}' not found in data headers") # Add column fields if columns: for col_field in columns: if col_field in field_names: success = False try: # Method 1: Direct string access field = pivot_table.PivotFields(col_field) field.Orientation = 2 # xlColumnField success = True except: try: # Method 2: Index access field_index = field_names.index(col_field) + 1 field = pivot_table.PivotFields(field_index) field.Orientation = 2 # xlColumnField success = True except Exception as e: error_msg = f"Failed to add column field '{col_field}': {str(e)}" logger.warning(error_msg) warnings.append(error_msg) else: warnings.append(f"Column field '{col_field}' not found in data headers") # Add value fields with aggregation for value_field in values: if value_field in field_names: success = False try: # Method 1: Direct string access field = pivot_table.PivotFields(value_field) field.Orientation = 4 # xlDataField success = True logger.info(f"Added value field '{value_field}' successfully") except: try: # Method 2: Index access field_index = field_names.index(value_field) + 1 field = pivot_table.PivotFields(field_index) field.Orientation = 4 # xlDataField success = True logger.info(f"Added value field '{value_field}' using index") except Exception as e: error_msg = f"Failed to add value field '{value_field}': {str(e)}" logger.warning(error_msg) warnings.append(error_msg) # Try to set aggregation function if field was added successfully if success and agg_func.lower() != 'sum': try: # Safer approach: iterate through DataFields to find our field agg_map = { 'count': -4112, # xlCount 'average': -4106, # xlAverage 'mean': -4106, # xlAverage (alias) 'max': -4136, # xlMax 'min': -4139, # xlMin } if agg_func.lower() in agg_map: # Wait a moment for COM to update import time time.sleep(0.1) # Try to find and update the data field for i in range(1, pivot_table.DataFields.Count + 1): try: data_field = pivot_table.DataFields(i) # Check if this is our field (name contains the original field name) if value_field in str(data_field.SourceName): data_field.Function = agg_map[agg_func.lower()] logger.info(f"Set aggregation to {agg_func} for {value_field}") break except: continue except Exception as e: # Non-critical: aggregation function setting failed logger.debug(f"Could not set aggregation function for {value_field}: {e}") else: warnings.append(f"Value field '{value_field}' not found in data headers") # Apply default pivot table style pivot_table.TableStyle2 = "PivotStyleMedium9" # Save the workbook wb.save() # Prepare result result = { "message": f"Successfully created pivot table '{pivot_name}'", "pivot_name": pivot_name, "pivot_sheet": pivot_sheet.name, "pivot_cell": target_cell, "source_range": data_range, "source_sheet": source_sheet.name, "rows": rows, "columns": columns or [], "values": values, "aggregation": agg_func } # Add warnings if any if warnings: result["warnings"] = warnings logger.info(f"โ ๏ธ Pivot table created with warnings: {warnings}") else: logger.info(f"โ Successfully created pivot table '{pivot_name}' at {pivot_sheet.name}!{target_cell}") return result except Exception as e: logger.error(f"โ Error creating pivot table: {str(e)}") return {"error": str(e)}
- Legacy filepath-based handler for pivot table creation. Opens Excel app and workbook, executes same pivot logic, then closes.def create_pivot_table_xlw( filepath: str, sheet_name: str, data_range: str, rows: List[str], values: List[str], columns: Optional[List[str]] = None, agg_func: str = "sum", target_sheet: Optional[str] = None, target_cell: str = None, pivot_name: Optional[str] = None ) -> Dict[str, Any]: """ Create a pivot table in Excel using xlwings. Args: filepath: Path to Excel file sheet_name: Name of worksheet containing source data data_range: Source data range (e.g., "A1:E100" or "Sheet2!A1:E100") rows: Field names for row labels values: Field names for values columns: Field names for column labels (optional) agg_func: Aggregation function (sum, count, average, max, min) target_sheet: Target sheet for pivot table (optional) target_cell: Target cell for pivot table (optional, default finds empty area) pivot_name: Custom name for pivot table (optional) Returns: Dict with success message or error """ app = None wb = None # Initialize COM for thread safety (Windows) _com_initialize() try: logger.info(f"Creating pivot table in {sheet_name}") # Check if file exists if not os.path.exists(filepath): return {"error": f"File not found: {filepath}"} # Open Excel app and workbook app = xw.App(visible=False, add_book=False) wb = app.books.open(filepath) # Check if sheet exists sheet_names = [s.name for s in wb.sheets] if sheet_name not in sheet_names: return {"error": f"Sheet '{sheet_name}' not found"} # Parse data range to support cross-sheet references if "!" in data_range: # Format: "SheetName!A1:E100" source_sheet_name, range_part = data_range.split("!", 1) # Remove quotes if present source_sheet_name = source_sheet_name.strip("'\"") if source_sheet_name not in sheet_names: return {"error": f"Source sheet '{source_sheet_name}' not found"} source_sheet = wb.sheets[source_sheet_name] source_range = source_sheet.range(range_part) else: # Use the provided sheet_name source_sheet = wb.sheets[sheet_name] source_range = source_sheet.range(data_range) # Determine target sheet for pivot table if target_sheet: # Use specified target sheet if target_sheet not in sheet_names: # Create if doesn't exist pivot_sheet = wb.sheets.add(target_sheet) else: pivot_sheet = wb.sheets[target_sheet] else: # Auto-generate unique pivot sheet name pivot_sheet_name = "PivotTable" counter = 1 while pivot_sheet_name in sheet_names: pivot_sheet_name = f"PivotTable{counter}" counter += 1 pivot_sheet = wb.sheets.add(pivot_sheet_name) # Determine target cell position if not target_cell: # Find empty area automatically used_range = pivot_sheet.used_range if used_range: # Place below existing content with some spacing target_cell = f"A{used_range.last_cell.row + 3}" else: target_cell = "A3" # Default position if sheet is empty # Use COM API to create pivot table pivot_cache = wb.api.PivotCaches().Create( SourceType=1, # xlDatabase SourceData=source_range.api ) # Generate unique pivot table name if not provided if not pivot_name: existing_pivots = [] try: # Try to get existing pivot table names for sheet in wb.sheets: try: sheet_pivots = sheet.api.PivotTables() for i in range(1, sheet_pivots.Count + 1): existing_pivots.append(sheet_pivots.Item(i).Name) except: pass except: pass # Generate unique name pivot_name = "PivotTable1" counter = 1 while pivot_name in existing_pivots: counter += 1 pivot_name = f"PivotTable{counter}" pivot_table = pivot_cache.CreatePivotTable( TableDestination=pivot_sheet.range(target_cell).api, TableName=pivot_name ) # Get field names from first row of data (use source_range which is already parsed) header_range = source_range.rows[0] field_names = [cell.value for cell in header_range] # Track warnings for partial failures warnings = [] # Add row fields - try different COM API access methods for row_field in rows: if row_field in field_names: success = False try: # Method 1: Direct string access field = pivot_table.PivotFields(row_field) field.Orientation = 1 # xlRowField success = True except: try: # Method 2: Index access field_index = field_names.index(row_field) + 1 field = pivot_table.PivotFields(field_index) field.Orientation = 1 # xlRowField success = True except Exception as e: error_msg = f"Failed to add row field '{row_field}': {str(e)}" logger.warning(error_msg) warnings.append(error_msg) else: warnings.append(f"Row field '{row_field}' not found in data headers") # Add column fields if columns: for col_field in columns: if col_field in field_names: success = False try: # Method 1: Direct string access field = pivot_table.PivotFields(col_field) field.Orientation = 2 # xlColumnField success = True except: try: # Method 2: Index access field_index = field_names.index(col_field) + 1 field = pivot_table.PivotFields(field_index) field.Orientation = 2 # xlColumnField success = True except Exception as e: error_msg = f"Failed to add column field '{col_field}': {str(e)}" logger.warning(error_msg) warnings.append(error_msg) else: warnings.append(f"Column field '{col_field}' not found in data headers") # Add value fields with aggregation # Note: Aggregation function setting is simplified for stability # Users can change aggregation type in Excel after creation for value_field in values: if value_field in field_names: success = False try: # Method 1: Direct string access field = pivot_table.PivotFields(value_field) field.Orientation = 4 # xlDataField success = True logger.info(f"Added value field '{value_field}' successfully") except: try: # Method 2: Index access field_index = field_names.index(value_field) + 1 field = pivot_table.PivotFields(field_index) field.Orientation = 4 # xlDataField success = True logger.info(f"Added value field '{value_field}' using index") except Exception as e: error_msg = f"Failed to add value field '{value_field}': {str(e)}" logger.warning(error_msg) warnings.append(error_msg) # Try to set aggregation function if field was added successfully # This is optional - if it fails, the default (usually Sum) will be used if success and agg_func.lower() != 'sum': try: # Safer approach: iterate through DataFields to find our field agg_map = { 'count': -4112, # xlCount 'average': -4106, # xlAverage 'mean': -4106, # xlAverage (alias) 'max': -4136, # xlMax 'min': -4139, # xlMin } if agg_func.lower() in agg_map: # Wait a moment for COM to update import time time.sleep(0.1) # Try to find and update the data field for i in range(1, pivot_table.DataFields.Count + 1): try: data_field = pivot_table.DataFields(i) # Check if this is our field (name contains the original field name) if value_field in str(data_field.SourceName): data_field.Function = agg_map[agg_func.lower()] logger.info(f"Set aggregation to {agg_func} for {value_field}") break except: continue except Exception as e: # Non-critical: aggregation function setting failed logger.debug(f"Could not set aggregation function for {value_field}: {e}") # Don't add to warnings - field was added successfully else: warnings.append(f"Value field '{value_field}' not found in data headers") # Apply default pivot table style pivot_table.TableStyle2 = "PivotStyleMedium9" # Save the workbook wb.save() # Prepare result result = { "message": f"Successfully created pivot table '{pivot_name}'", "pivot_name": pivot_name, "pivot_sheet": pivot_sheet.name, "pivot_cell": target_cell, "source_range": data_range, "source_sheet": source_sheet.name, "rows": rows, "columns": columns or [], "values": values, "aggregation": agg_func } # Add warnings if any if warnings: result["warnings"] = warnings logger.info(f"โ ๏ธ Pivot table created with warnings: {warnings}") else: logger.info(f"โ Successfully created pivot table '{pivot_name}' at {pivot_sheet.name}!{target_cell}") return result except Exception as e: logger.error(f"โ Error creating pivot table: {str(e)}") return {"error": str(e)} finally: if wb: wb.close() if app: app.quit()