data_processor.py•9.93 kB
"""
数据处理器模块
处理Excel数据的查询、范围读写、公式计算等操作
"""
import re
import logging
from typing import Dict, Any, List, Optional, Union, Tuple
import pandas as pd
import openpyxl
from openpyxl.utils import range_boundaries
from openpyxl.utils.cell import coordinate_from_string, column_index_from_string
from .file_manager import FileManager
class DataProcessor:
"""数据处理器"""
def __init__(self, file_manager: FileManager):
self.file_manager = file_manager
def read_range(self, file_path: str, sheet_name: str, cell_range: str = None) -> Dict[str, Any]:
"""读取指定范围的数据"""
try:
workbook = self.file_manager.load_workbook(file_path, read_only=True)
if not workbook:
return {"error": "无法加载工作簿"}
if sheet_name not in workbook.sheetnames:
return {"error": f"工作表 '{sheet_name}' 不存在"}
sheet = workbook[sheet_name]
if cell_range:
# 读取指定范围
data = self._read_range_data(sheet, cell_range)
else:
# 读取整个工作表
data = self._read_all_data(sheet)
workbook.close()
return {
"success": True,
"data": data,
"sheet_name": sheet_name,
"range": cell_range or "all"
}
except Exception as e:
logging.error(f"读取范围数据失败: {e}")
return {"error": str(e)}
def write_range(self, file_path: str, sheet_name: str, start_cell: str, data: List[List[Any]]) -> Dict[str, Any]:
"""写入数据到指定范围"""
try:
workbook = self.file_manager.load_workbook(file_path)
if not workbook:
return {"error": "无法加载工作簿"}
# 获取或创建工作表
if sheet_name in workbook.sheetnames:
sheet = workbook[sheet_name]
else:
sheet = workbook.create_sheet(title=sheet_name)
# 解析起始单元格坐标
start_col, start_row = coordinate_from_string(start_cell)
start_col_idx = column_index_from_string(start_col)
# 写入数据
for row_idx, row_data in enumerate(data):
for col_idx, cell_value in enumerate(row_data):
cell = sheet.cell(
row=start_row + row_idx,
column=start_col_idx + col_idx,
value=cell_value
)
# 保存工作簿
if self.file_manager.save_workbook(workbook, file_path):
return {
"success": True,
"message": f"成功写入数据到 {sheet_name}!{start_cell}",
"rows_written": len(data),
"cols_written": len(data[0]) if data else 0
}
else:
return {"error": "保存工作簿失败"}
except Exception as e:
logging.error(f"写入范围数据失败: {e}")
return {"error": str(e)}
def query_data(self, file_path: str, sheet_name: str, conditions: Dict[str, Any] = None) -> Dict[str, Any]:
"""查询数据"""
try:
# 使用pandas进行数据查询
df = self.file_manager.read_as_dataframe(file_path, sheet_name)
if df is None:
return {"error": "无法读取数据"}
# 应用查询条件
if conditions:
df = self._apply_conditions(df, conditions)
# 转换为列表格式
data = df.values.tolist()
columns = df.columns.tolist()
return {
"success": True,
"data": data,
"columns": columns,
"row_count": len(data),
"sheet_name": sheet_name
}
except Exception as e:
logging.error(f"查询数据失败: {e}")
return {"error": str(e)}
def calculate_formula(self, file_path: str, sheet_name: str, cell: str, formula: str) -> Dict[str, Any]:
"""计算并应用公式"""
try:
workbook = self.file_manager.load_workbook(file_path)
if not workbook:
return {"error": "无法加载工作簿"}
if sheet_name not in workbook.sheetnames:
return {"error": f"工作表 '{sheet_name}' 不存在"}
sheet = workbook[sheet_name]
# 确保公式以等号开头
if not formula.startswith('='):
formula = '=' + formula
# 设置公式
sheet[cell] = formula
# 保存工作簿
if self.file_manager.save_workbook(workbook, file_path):
# 重新加载以获取计算结果
workbook = self.file_manager.load_workbook(file_path, read_only=True)
sheet = workbook[sheet_name]
result_value = sheet[cell].value
workbook.close()
return {
"success": True,
"cell": cell,
"formula": formula,
"value": result_value
}
else:
return {"error": "保存工作簿失败"}
except Exception as e:
logging.error(f"计算公式失败: {e}")
return {"error": str(e)}
def _read_range_data(self, sheet, cell_range: str) -> List[List[Any]]:
"""读取指定范围的数据"""
min_col, min_row, max_col, max_row = range_boundaries(cell_range)
data = []
for row in sheet.iter_rows(min_row=min_row, max_row=max_row,
min_col=min_col, max_col=max_col, values_only=True):
data.append(list(row))
return data
def _read_all_data(self, sheet) -> List[List[Any]]:
"""读取工作表的所有数据"""
data = []
for row in sheet.iter_rows(values_only=True):
# 跳过完全空白的行
if any(cell is not None for cell in row):
data.append(list(row))
return data
def _apply_conditions(self, df: pd.DataFrame, conditions: Dict[str, Any]) -> pd.DataFrame:
"""应用查询条件"""
result_df = df.copy()
for column, condition in conditions.items():
if column not in df.columns:
continue
if isinstance(condition, dict):
# 复杂条件
if 'eq' in condition:
result_df = result_df[result_df[column] == condition['eq']]
elif 'ne' in condition:
result_df = result_df[result_df[column] != condition['ne']]
elif 'gt' in condition:
result_df = result_df[result_df[column] > condition['gt']]
elif 'lt' in condition:
result_df = result_df[result_df[column] < condition['lt']]
elif 'gte' in condition:
result_df = result_df[result_df[column] >= condition['gte']]
elif 'lte' in condition:
result_df = result_df[result_df[column] <= condition['lte']]
elif 'contains' in condition:
result_df = result_df[result_df[column].astype(str).str.contains(condition['contains'], na=False)]
elif 'in' in condition:
result_df = result_df[result_df[column].isin(condition['in'])]
else:
# 简单等值条件
result_df = result_df[result_df[column] == condition]
return result_df
def get_range_info(self, file_path: str, sheet_name: str, cell_range: str) -> Dict[str, Any]:
"""获取范围信息"""
try:
workbook = self.file_manager.load_workbook(file_path, read_only=True)
if not workbook:
return {"error": "无法加载工作簿"}
if sheet_name not in workbook.sheetnames:
return {"error": f"工作表 '{sheet_name}' 不存在"}
# 解析范围
min_col, min_row, max_col, max_row = range_boundaries(cell_range)
sheet = workbook[sheet_name]
# 获取范围内的数据统计
cell_count = (max_row - min_row + 1) * (max_col - min_col + 1)
non_empty_count = 0
for row in sheet.iter_rows(min_row=min_row, max_row=max_row,
min_col=min_col, max_col=max_col):
for cell in row:
if cell.value is not None:
non_empty_count += 1
workbook.close()
return {
"success": True,
"range": cell_range,
"cell_count": cell_count,
"non_empty_count": non_empty_count,
"empty_count": cell_count - non_empty_count,
"start_cell": f"{sheet.cell(min_row, min_col).coordinate}",
"end_cell": f"{sheet.cell(max_row, max_col).coordinate}"
}
except Exception as e:
logging.error(f"获取范围信息失败: {e}")
return {"error": str(e)}