Skip to main content
Glama

Office MCP Server

by walkingzzzy
excel_data.py26.7 kB
"""Excel 数据操作模块.""" from typing import Any, Optional, Union from openpyxl import load_workbook from openpyxl.worksheet.datavalidation import DataValidation from loguru import logger from office_mcp_server.config import config from office_mcp_server.utils.file_manager import FileManager class ExcelDataOperations: """Excel 数据操作类.""" def __init__(self) -> None: """初始化数据操作类.""" self.file_manager = FileManager() def sort_data( self, filename: str, sheet_name: str, data_range: str, sort_by_column: int = 0, ascending: bool = True, sort_keys: Optional[list[dict[str, Any]]] = None, ) -> dict[str, Any]: """对数据进行排序. Args: filename: 文件名 sheet_name: 工作表名称 data_range: 数据范围 sort_by_column: 排序列索引(单列排序时使用) ascending: 是否升序(单列排序时使用) sort_keys: 多列排序键列表,格式: [{"column": 0, "ascending": True}, {"column": 1, "ascending": False}] """ try: file_path = config.paths.output_dir / filename self.file_manager.validate_file_path(file_path, must_exist=True) wb = load_workbook(str(file_path)) if sheet_name not in wb.sheetnames: raise ValueError(f"工作表 '{sheet_name}' 不存在") ws = wb[sheet_name] # 获取数据范围 data_cells = ws[data_range] # 将数据转换为列表 data_list = [] for row in data_cells: row_data = [cell.value for cell in row] data_list.append(row_data) # 多列排序 if sort_keys and len(sort_keys) > 0: # 验证所有排序列 for key in sort_keys: col_idx = key.get("column", 0) if col_idx >= len(data_list[0]): raise ValueError(f"排序列索引 {col_idx} 超出范围") # 多列排序 def multi_key(row): keys = [] for key in sort_keys: col_idx = key.get("column", 0) asc = key.get("ascending", True) value = row[col_idx] if row[col_idx] is not None else "" # 对于降序,需要反转排序键 if not asc: # 对于数字,使用负数;对于字符串,需要特殊处理 if isinstance(value, (int, float)): keys.append(-value) else: keys.append((False, value)) # 标记为降序 else: if isinstance(value, (int, float)): keys.append(value) else: keys.append((True, value)) # 标记为升序 return keys # 自定义比较函数处理混合排序 from functools import cmp_to_key def compare(a, b): for key in sort_keys: col_idx = key.get("column", 0) asc = key.get("ascending", True) val_a = a[col_idx] if a[col_idx] is not None else "" val_b = b[col_idx] if b[col_idx] is not None else "" # 比较值 if val_a < val_b: result = -1 elif val_a > val_b: result = 1 else: result = 0 # 如果不相等,应用排序方向 if result != 0: return result if asc else -result return 0 sorted_data = sorted(data_list, key=cmp_to_key(compare)) else: # 单列排序 if sort_by_column >= len(data_list[0]): raise ValueError(f"排序列索引 {sort_by_column} 超出范围") sorted_data = sorted( data_list, key=lambda x: x[sort_by_column] if x[sort_by_column] is not None else "", reverse=not ascending ) # 写回数据 from openpyxl.utils import range_boundaries min_col, min_row, max_col, max_row = range_boundaries(data_range) for i, row_data in enumerate(sorted_data): for j, value in enumerate(row_data): ws.cell(row=min_row + i, column=min_col + j, value=value) wb.save(str(file_path)) wb.close() logger.info(f"数据排序成功: {file_path}") return { "success": True, "message": "数据排序成功", "filename": str(file_path), "sort_by_column": sort_by_column if not sort_keys else None, "ascending": ascending if not sort_keys else None, "sort_keys": sort_keys if sort_keys else None, } except Exception as e: logger.error(f"排序数据失败: {e}") return {"success": False, "message": f"排序失败: {str(e)}"} def filter_data( self, filename: str, sheet_name: str, data_range: str, filter_column: Optional[int] = None, filter_value: Optional[str] = None, filter_operator: str = "equals", enable_autofilter: bool = True, ) -> dict[str, Any]: """对数据进行筛选.""" try: file_path = config.paths.output_dir / filename self.file_manager.validate_file_path(file_path, must_exist=True) wb = load_workbook(str(file_path)) if sheet_name not in wb.sheetnames: raise ValueError(f"工作表 '{sheet_name}' 不存在") ws = wb[sheet_name] # 启用自动筛选 if enable_autofilter: ws.auto_filter.ref = data_range # 如果指定了筛选列和值,应用筛选 if filter_column is not None and filter_value is not None: from openpyxl.utils import range_boundaries from openpyxl.worksheet.filters import FilterColumn, CustomFilters, CustomFilter min_col, min_row, max_col, max_row = range_boundaries(data_range) if filter_column >= (max_col - min_col + 1): raise ValueError(f"筛选列索引 {filter_column} 超出范围") # 创建筛选条件 operator_map = { 'equals': 'equal', 'notEquals': 'notEqual', 'greaterThan': 'greaterThan', 'lessThan': 'lessThan', 'greaterThanOrEqual': 'greaterThanOrEqual', 'lessThanOrEqual': 'lessThanOrEqual', } if filter_operator in operator_map: filter_op = operator_map[filter_operator] custom_filter = CustomFilter(operator=filter_op, val=filter_value) custom_filters = CustomFilters(customFilter=[custom_filter]) filter_column_obj = FilterColumn(colId=filter_column, customFilters=custom_filters) ws.auto_filter.filterColumn.append(filter_column_obj) wb.save(str(file_path)) wb.close() logger.info(f"数据筛选成功: {file_path}") return { "success": True, "message": "数据筛选成功", "filename": str(file_path), "data_range": data_range, } except Exception as e: logger.error(f"筛选数据失败: {e}") return {"success": False, "message": f"筛选失败: {str(e)}"} def set_data_validation( self, filename: str, sheet_name: str, cell_range: str, validation_type: str, operator: Optional[str] = None, formula1: Optional[str] = None, formula2: Optional[str] = None, allow_blank: bool = True, show_dropdown: bool = True, prompt_title: Optional[str] = None, prompt: Optional[str] = None, error_title: Optional[str] = None, error: Optional[str] = None, ) -> dict[str, Any]: """设置数据验证.""" try: file_path = config.paths.output_dir / filename self.file_manager.validate_file_path(file_path, must_exist=True) wb = load_workbook(str(file_path)) if sheet_name not in wb.sheetnames: raise ValueError(f"工作表 '{sheet_name}' 不存在") ws = wb[sheet_name] # 创建数据验证对象 if validation_type == 'list': # 列表验证 if not formula1: raise ValueError("列表验证需要指定formula1(逗号分隔的值)") dv = DataValidation( type="list", formula1=f'"{formula1}"', allow_blank=allow_blank, showDropDown=not show_dropdown # 注意:这个参数是反的 ) elif validation_type == 'whole': # 整数验证 dv = DataValidation( type="whole", operator=operator or "between", formula1=formula1, formula2=formula2, allow_blank=allow_blank ) elif validation_type == 'decimal': # 小数验证 dv = DataValidation( type="decimal", operator=operator or "between", formula1=formula1, formula2=formula2, allow_blank=allow_blank ) elif validation_type == 'date': # 日期验证 dv = DataValidation( type="date", operator=operator or "between", formula1=formula1, formula2=formula2, allow_blank=allow_blank ) elif validation_type == 'time': # 时间验证 dv = DataValidation( type="time", operator=operator or "between", formula1=formula1, formula2=formula2, allow_blank=allow_blank ) elif validation_type == 'textLength': # 文本长度验证 dv = DataValidation( type="textLength", operator=operator or "between", formula1=formula1, formula2=formula2, allow_blank=allow_blank ) elif validation_type == 'custom': # 自定义公式验证 if not formula1: raise ValueError("自定义验证需要指定formula1") dv = DataValidation( type="custom", formula1=formula1, allow_blank=allow_blank ) else: raise ValueError(f"不支持的验证类型: {validation_type}") # 设置提示信息 if prompt_title and prompt: dv.prompt = prompt dv.promptTitle = prompt_title dv.showInputMessage = True # 设置错误信息 if error_title and error: dv.error = error dv.errorTitle = error_title dv.showErrorMessage = True # 应用数据验证 ws.add_data_validation(dv) dv.add(cell_range) wb.save(str(file_path)) wb.close() logger.info(f"数据验证设置成功: {file_path}") return { "success": True, "message": f"数据验证已应用到 {cell_range}", "filename": str(file_path), "validation_type": validation_type, "cell_range": cell_range, } except Exception as e: logger.error(f"设置数据验证失败: {e}") return {"success": False, "message": f"设置失败: {str(e)}"} def manage_worksheets( self, filename: str, operation: str, sheet_name: Optional[str] = None, new_name: Optional[str] = None, target_index: Optional[int] = None, ) -> dict[str, Any]: """管理工作表.""" try: file_path = config.paths.output_dir / filename self.file_manager.validate_file_path(file_path, must_exist=True) wb = load_workbook(str(file_path)) if operation == 'create': # 创建新工作表 if not sheet_name: sheet_name = f"Sheet{len(wb.sheetnames) + 1}" wb.create_sheet(title=sheet_name) message = f"工作表 '{sheet_name}' 创建成功" elif operation == 'delete': # 删除工作表 if not sheet_name: raise ValueError("删除工作表需要指定sheet_name") if sheet_name not in wb.sheetnames: raise ValueError(f"工作表 '{sheet_name}' 不存在") wb.remove(wb[sheet_name]) message = f"工作表 '{sheet_name}' 删除成功" elif operation == 'rename': # 重命名工作表 if not sheet_name or not new_name: raise ValueError("重命名工作表需要指定sheet_name和new_name") if sheet_name not in wb.sheetnames: raise ValueError(f"工作表 '{sheet_name}' 不存在") wb[sheet_name].title = new_name message = f"工作表 '{sheet_name}' 已重命名为 '{new_name}'" elif operation == 'copy': # 复制工作表 if not sheet_name: raise ValueError("复制工作表需要指定sheet_name") if sheet_name not in wb.sheetnames: raise ValueError(f"工作表 '{sheet_name}' 不存在") source_sheet = wb[sheet_name] if not new_name: new_name = f"{sheet_name}_Copy" target_sheet = wb.copy_worksheet(source_sheet) target_sheet.title = new_name message = f"工作表 '{sheet_name}' 已复制为 '{new_name}'" elif operation == 'move': # 移动工作表 if not sheet_name or target_index is None: raise ValueError("移动工作表需要指定sheet_name和target_index") if sheet_name not in wb.sheetnames: raise ValueError(f"工作表 '{sheet_name}' 不存在") wb.move_sheet(sheet_name, offset=target_index - wb.sheetnames.index(sheet_name)) message = f"工作表 '{sheet_name}' 已移动到位置 {target_index}" else: raise ValueError(f"不支持的操作类型: {operation}") wb.save(str(file_path)) wb.close() logger.info(f"工作表管理成功: {file_path}") return { "success": True, "message": message, "filename": str(file_path), "operation": operation, } except Exception as e: logger.error(f"管理工作表失败: {e}") return {"success": False, "message": f"操作失败: {str(e)}"} def create_table( self, filename: str, sheet_name: str, table_range: str, table_name: str, style: str = "TableStyleMedium9", show_header: bool = True, show_totals: bool = False, ) -> dict[str, Any]: """创建Excel表格样式. Args: filename: Excel文件名 sheet_name: 工作表名称 table_range: 表格范围,如 "A1:D10" table_name: 表格名称(必须唯一) style: 表格样式名称(如 TableStyleMedium9, TableStyleLight1等) show_header: 是否显示表头 show_totals: 是否显示汇总行 """ try: from openpyxl.worksheet.table import Table, TableStyleInfo file_path = config.paths.output_dir / filename self.file_manager.validate_file_path(file_path, must_exist=True) wb = load_workbook(str(file_path)) if sheet_name not in wb.sheetnames: raise ValueError(f"工作表 '{sheet_name}' 不存在") ws = wb[sheet_name] # 验证表格名称是否已存在 for table in ws.tables.values(): if table.name == table_name: raise ValueError(f"表格名称 '{table_name}' 已存在") # 创建表格 tab = Table(displayName=table_name, ref=table_range) # 设置表格样式 style_info = TableStyleInfo( name=style, showFirstColumn=False, showLastColumn=False, showRowStripes=True, showColumnStripes=False ) tab.tableStyleInfo = style_info # 添加表格到工作表 ws.add_table(tab) wb.save(str(file_path)) wb.close() logger.info(f"Excel表格创建成功: {table_name}") return { "success": True, "message": f"表格 '{table_name}' 创建成功", "filename": str(file_path), "sheet_name": sheet_name, "table_name": table_name, "table_range": table_range, "style": style, } except Exception as e: logger.error(f"创建Excel表格失败: {e}") return {"success": False, "message": f"创建表格失败: {str(e)}"} def sort_data_by_color( self, filename: str, sheet_name: str, data_range: str, sort_by_column: int, color_order: list[str], sort_by: str = "fill", ) -> dict[str, Any]: """按颜色排序数据. Args: filename: 文件名 sheet_name: 工作表名称 data_range: 数据范围 sort_by_column: 排序列索引 color_order: 颜色顺序列表(HEX格式,如 ['#FF0000', '#00FF00', '#0000FF']) sort_by: 排序依据 ('fill'按填充色, 'font'按字体色, 默认 'fill') """ try: file_path = config.paths.output_dir / filename self.file_manager.validate_file_path(file_path, must_exist=True) wb = load_workbook(str(file_path)) if sheet_name not in wb.sheetnames: raise ValueError(f"工作表 '{sheet_name}' 不存在") ws = wb[sheet_name] # 获取数据范围 from openpyxl.utils import range_boundaries min_col, min_row, max_col, max_row = range_boundaries(data_range) # 读取数据及颜色信息 data_with_color = [] for row_idx in range(min_row, max_row + 1): row_data = [] for col_idx in range(min_col, max_col + 1): cell = ws.cell(row=row_idx, column=col_idx) row_data.append(cell.value) # 获取排序列的颜色 sort_cell = ws.cell(row=row_idx, column=min_col + sort_by_column) if sort_by == "fill": color = sort_cell.fill.start_color.rgb if sort_cell.fill and sort_cell.fill.start_color else None else: # font color = sort_cell.font.color.rgb if sort_cell.font and sort_cell.font.color else None # 转换颜色格式 if color and len(str(color)) == 8: # ARGB格式 color = f"#{str(color)[2:]}" # 去掉前两位的Alpha通道 elif color: color = f"#{color}" if not str(color).startswith('#') else color data_with_color.append((row_data, color)) # 按颜色顺序排序 def color_sort_key(item): row_data, color = item if color and color.upper() in [c.upper() for c in color_order]: # 找到颜色在color_order中的索引 for i, c in enumerate(color_order): if c.upper() == str(color).upper(): return i return len(color_order) # 未匹配的颜色排在最后 sorted_data_with_color = sorted(data_with_color, key=color_sort_key) # 写回数据 for i, (row_data, color) in enumerate(sorted_data_with_color): for j, value in enumerate(row_data): ws.cell(row=min_row + i, column=min_col + j, value=value) wb.save(str(file_path)) wb.close() logger.info(f"按颜色排序成功: {file_path}") return { "success": True, "message": "按颜色排序成功", "filename": str(file_path), "sort_by_column": sort_by_column, "color_order": color_order, "sort_by": sort_by, } except Exception as e: logger.error(f"按颜色排序失败: {e}") return {"success": False, "message": f"排序失败: {str(e)}"} def copy_filtered_data( self, filename: str, sheet_name: str, source_range: str, target_cell: str, filter_column: int, filter_value: str, filter_operator: str = "equals", ) -> dict[str, Any]: """复制筛选结果到新位置. Args: filename: 文件名 sheet_name: 工作表名称 source_range: 源数据范围 target_cell: 目标起始单元格 filter_column: 筛选列索引 filter_value: 筛选值 filter_operator: 筛选操作符 """ try: file_path = config.paths.output_dir / filename self.file_manager.validate_file_path(file_path, must_exist=True) wb = load_workbook(str(file_path)) if sheet_name not in wb.sheetnames: raise ValueError(f"工作表 '{sheet_name}' 不存在") ws = wb[sheet_name] # 获取数据范围 from openpyxl.utils import range_boundaries, coordinate_from_string, column_index_from_string min_col, min_row, max_col, max_row = range_boundaries(source_range) # 读取数据 data_list = [] for row_idx in range(min_row, max_row + 1): row_data = [] for col_idx in range(min_col, max_col + 1): cell = ws.cell(row=row_idx, column=col_idx) row_data.append(cell.value) data_list.append(row_data) # 筛选数据 filtered_data = [] for row in data_list: if filter_column >= len(row): continue cell_value = str(row[filter_column]) if row[filter_column] is not None else "" filter_val_str = str(filter_value) # 应用筛选条件 match = False if filter_operator == "equals": match = cell_value == filter_val_str elif filter_operator == "notEquals": match = cell_value != filter_val_str elif filter_operator == "contains": match = filter_val_str in cell_value elif filter_operator == "notContains": match = filter_val_str not in cell_value elif filter_operator == "beginsWith": match = cell_value.startswith(filter_val_str) elif filter_operator == "endsWith": match = cell_value.endswith(filter_val_str) elif filter_operator == "greaterThan": try: match = float(cell_value) > float(filter_val_str) except (ValueError, TypeError): match = False elif filter_operator == "lessThan": try: match = float(cell_value) < float(filter_val_str) except (ValueError, TypeError): match = False if match: filtered_data.append(row) # 解析目标单元格 target_col_letter, target_row = coordinate_from_string(target_cell) target_col = column_index_from_string(target_col_letter) # 写入筛选结果 for i, row_data in enumerate(filtered_data): for j, value in enumerate(row_data): ws.cell(row=target_row + i, column=target_col + j, value=value) wb.save(str(file_path)) wb.close() logger.info(f"筛选数据复制成功: {file_path}, 复制了 {len(filtered_data)} 行") return { "success": True, "message": f"成功复制 {len(filtered_data)} 行筛选结果", "filename": str(file_path), "source_range": source_range, "target_cell": target_cell, "rows_copied": len(filtered_data), } except Exception as e: logger.error(f"复制筛选数据失败: {e}") return {"success": False, "message": f"复制失败: {str(e)}"}

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/walkingzzzy/office-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server