Skip to main content
Glama
data_processor.py9.93 kB
""" 数据处理器模块 处理Excel数据的查询、范围读写、公式计算等操作 """ import re import logging from typing import Dict, Any, List, Optional, Union, Tuple import pandas as pd import openpyxl from openpyxl.utils import range_boundaries from openpyxl.utils.cell import coordinate_from_string, column_index_from_string from .file_manager import FileManager class DataProcessor: """数据处理器""" def __init__(self, file_manager: FileManager): self.file_manager = file_manager def read_range(self, file_path: str, sheet_name: str, cell_range: str = None) -> Dict[str, Any]: """读取指定范围的数据""" try: workbook = self.file_manager.load_workbook(file_path, read_only=True) if not workbook: return {"error": "无法加载工作簿"} if sheet_name not in workbook.sheetnames: return {"error": f"工作表 '{sheet_name}' 不存在"} sheet = workbook[sheet_name] if cell_range: # 读取指定范围 data = self._read_range_data(sheet, cell_range) else: # 读取整个工作表 data = self._read_all_data(sheet) workbook.close() return { "success": True, "data": data, "sheet_name": sheet_name, "range": cell_range or "all" } except Exception as e: logging.error(f"读取范围数据失败: {e}") return {"error": str(e)} def write_range(self, file_path: str, sheet_name: str, start_cell: str, data: List[List[Any]]) -> Dict[str, Any]: """写入数据到指定范围""" try: workbook = self.file_manager.load_workbook(file_path) if not workbook: return {"error": "无法加载工作簿"} # 获取或创建工作表 if sheet_name in workbook.sheetnames: sheet = workbook[sheet_name] else: sheet = workbook.create_sheet(title=sheet_name) # 解析起始单元格坐标 start_col, start_row = coordinate_from_string(start_cell) start_col_idx = column_index_from_string(start_col) # 写入数据 for row_idx, row_data in enumerate(data): for col_idx, cell_value in enumerate(row_data): cell = sheet.cell( row=start_row + row_idx, column=start_col_idx + col_idx, value=cell_value ) # 保存工作簿 if self.file_manager.save_workbook(workbook, file_path): return { "success": True, "message": f"成功写入数据到 {sheet_name}!{start_cell}", "rows_written": len(data), "cols_written": len(data[0]) if data else 0 } else: return {"error": "保存工作簿失败"} except Exception as e: logging.error(f"写入范围数据失败: {e}") return {"error": str(e)} def query_data(self, file_path: str, sheet_name: str, conditions: Dict[str, Any] = None) -> Dict[str, Any]: """查询数据""" try: # 使用pandas进行数据查询 df = self.file_manager.read_as_dataframe(file_path, sheet_name) if df is None: return {"error": "无法读取数据"} # 应用查询条件 if conditions: df = self._apply_conditions(df, conditions) # 转换为列表格式 data = df.values.tolist() columns = df.columns.tolist() return { "success": True, "data": data, "columns": columns, "row_count": len(data), "sheet_name": sheet_name } except Exception as e: logging.error(f"查询数据失败: {e}") return {"error": str(e)} def calculate_formula(self, file_path: str, sheet_name: str, cell: str, formula: str) -> Dict[str, Any]: """计算并应用公式""" try: workbook = self.file_manager.load_workbook(file_path) if not workbook: return {"error": "无法加载工作簿"} if sheet_name not in workbook.sheetnames: return {"error": f"工作表 '{sheet_name}' 不存在"} sheet = workbook[sheet_name] # 确保公式以等号开头 if not formula.startswith('='): formula = '=' + formula # 设置公式 sheet[cell] = formula # 保存工作簿 if self.file_manager.save_workbook(workbook, file_path): # 重新加载以获取计算结果 workbook = self.file_manager.load_workbook(file_path, read_only=True) sheet = workbook[sheet_name] result_value = sheet[cell].value workbook.close() return { "success": True, "cell": cell, "formula": formula, "value": result_value } else: return {"error": "保存工作簿失败"} except Exception as e: logging.error(f"计算公式失败: {e}") return {"error": str(e)} def _read_range_data(self, sheet, cell_range: str) -> List[List[Any]]: """读取指定范围的数据""" min_col, min_row, max_col, max_row = range_boundaries(cell_range) data = [] for row in sheet.iter_rows(min_row=min_row, max_row=max_row, min_col=min_col, max_col=max_col, values_only=True): data.append(list(row)) return data def _read_all_data(self, sheet) -> List[List[Any]]: """读取工作表的所有数据""" data = [] for row in sheet.iter_rows(values_only=True): # 跳过完全空白的行 if any(cell is not None for cell in row): data.append(list(row)) return data def _apply_conditions(self, df: pd.DataFrame, conditions: Dict[str, Any]) -> pd.DataFrame: """应用查询条件""" result_df = df.copy() for column, condition in conditions.items(): if column not in df.columns: continue if isinstance(condition, dict): # 复杂条件 if 'eq' in condition: result_df = result_df[result_df[column] == condition['eq']] elif 'ne' in condition: result_df = result_df[result_df[column] != condition['ne']] elif 'gt' in condition: result_df = result_df[result_df[column] > condition['gt']] elif 'lt' in condition: result_df = result_df[result_df[column] < condition['lt']] elif 'gte' in condition: result_df = result_df[result_df[column] >= condition['gte']] elif 'lte' in condition: result_df = result_df[result_df[column] <= condition['lte']] elif 'contains' in condition: result_df = result_df[result_df[column].astype(str).str.contains(condition['contains'], na=False)] elif 'in' in condition: result_df = result_df[result_df[column].isin(condition['in'])] else: # 简单等值条件 result_df = result_df[result_df[column] == condition] return result_df def get_range_info(self, file_path: str, sheet_name: str, cell_range: str) -> Dict[str, Any]: """获取范围信息""" try: workbook = self.file_manager.load_workbook(file_path, read_only=True) if not workbook: return {"error": "无法加载工作簿"} if sheet_name not in workbook.sheetnames: return {"error": f"工作表 '{sheet_name}' 不存在"} # 解析范围 min_col, min_row, max_col, max_row = range_boundaries(cell_range) sheet = workbook[sheet_name] # 获取范围内的数据统计 cell_count = (max_row - min_row + 1) * (max_col - min_col + 1) non_empty_count = 0 for row in sheet.iter_rows(min_row=min_row, max_row=max_row, min_col=min_col, max_col=max_col): for cell in row: if cell.value is not None: non_empty_count += 1 workbook.close() return { "success": True, "range": cell_range, "cell_count": cell_count, "non_empty_count": non_empty_count, "empty_count": cell_count - non_empty_count, "start_cell": f"{sheet.cell(min_row, min_col).coordinate}", "end_cell": f"{sheet.cell(max_row, max_col).coordinate}" } except Exception as e: logging.error(f"获取范围信息失败: {e}") return {"error": str(e)}

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/xuhongxin/excel-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server