Skip to main content
Glama

Office MCP Server

by walkingzzzy
excel_analysis.py31.4 kB
"""Excel 数据分析模块.""" import statistics from typing import Any, Optional import numpy as np import pandas as pd from openpyxl import load_workbook from loguru import logger from scipy import stats from sklearn.linear_model import LinearRegression from sklearn.preprocessing import PolynomialFeatures from office_mcp_server.config import config from office_mcp_server.utils.file_manager import FileManager class ExcelAnalysisOperations: """Excel 数据分析操作类.""" def __init__(self) -> None: """初始化数据分析操作类.""" self.file_manager = FileManager() def descriptive_statistics( self, filename: str, sheet_name: str, data_range: str, ) -> dict[str, Any]: """描述性统计分析. Args: filename: 文件名 sheet_name: 工作表名称 data_range: 数据范围 (如 'A1:A100') """ try: file_path = config.paths.output_dir / filename self.file_manager.validate_file_path(file_path, must_exist=True) wb = load_workbook(str(file_path)) if sheet_name not in wb.sheetnames: raise ValueError(f"工作表 '{sheet_name}' 不存在") ws = wb[sheet_name] data_cells = ws[data_range] # 提取数值数据 values = [] for row in data_cells: if isinstance(row, tuple): for cell in row: if isinstance(cell.value, (int, float)) and cell.value is not None: values.append(cell.value) else: if isinstance(row.value, (int, float)) and row.value is not None: values.append(row.value) if not values: raise ValueError("没有找到有效的数值数据") # 计算统计指标 stats = { "count": len(values), "sum": sum(values), "mean": statistics.mean(values), "median": statistics.median(values), "mode": statistics.mode(values) if len(set(values)) < len(values) else None, "std_dev": statistics.stdev(values) if len(values) > 1 else 0, "variance": statistics.variance(values) if len(values) > 1 else 0, "min": min(values), "max": max(values), "range": max(values) - min(values), } # 计算四分位数 sorted_values = sorted(values) n = len(sorted_values) stats["q1"] = sorted_values[n // 4] if n >= 4 else sorted_values[0] stats["q2"] = statistics.median(values) stats["q3"] = sorted_values[3 * n // 4] if n >= 4 else sorted_values[-1] logger.info(f"描述性统计完成: {file_path}") return { "success": True, "filename": str(file_path), "sheet_name": sheet_name, "data_range": data_range, "statistics": stats, } except Exception as e: logger.error(f"描述性统计失败: {e}") return {"success": False, "message": f"统计失败: {str(e)}"} def correlation_analysis( self, filename: str, sheet_name: str, data_range1: str, data_range2: str, ) -> dict[str, Any]: """相关性分析. Args: filename: 文件名 sheet_name: 工作表名称 data_range1: 第一个数据范围 (如 'A1:A100') data_range2: 第二个数据范围 (如 'B1:B100') """ try: file_path = config.paths.output_dir / filename self.file_manager.validate_file_path(file_path, must_exist=True) wb = load_workbook(str(file_path)) if sheet_name not in wb.sheetnames: raise ValueError(f"工作表 '{sheet_name}' 不存在") ws = wb[sheet_name] # 提取第一组数据 data_cells1 = ws[data_range1] values1 = [] for row in data_cells1: if isinstance(row, tuple): for cell in row: if isinstance(cell.value, (int, float)) and cell.value is not None: values1.append(cell.value) else: if isinstance(row.value, (int, float)) and row.value is not None: values1.append(row.value) # 提取第二组数据 data_cells2 = ws[data_range2] values2 = [] for row in data_cells2: if isinstance(row, tuple): for cell in row: if isinstance(cell.value, (int, float)) and cell.value is not None: values2.append(cell.value) else: if isinstance(row.value, (int, float)) and row.value is not None: values2.append(row.value) if len(values1) != len(values2): raise ValueError("两组数据长度不一致") if len(values1) < 2: raise ValueError("数据量太少,无法计算相关性") # 计算皮尔逊相关系数 n = len(values1) mean1 = statistics.mean(values1) mean2 = statistics.mean(values2) numerator = sum((x - mean1) * (y - mean2) for x, y in zip(values1, values2)) denominator1 = sum((x - mean1) ** 2 for x in values1) denominator2 = sum((y - mean2) ** 2 for y in values2) if denominator1 == 0 or denominator2 == 0: correlation = 0 else: correlation = numerator / ((denominator1 * denominator2) ** 0.5) # 判断相关性强度 abs_corr = abs(correlation) if abs_corr >= 0.8: strength = "强相关" elif abs_corr >= 0.5: strength = "中等相关" elif abs_corr >= 0.3: strength = "弱相关" else: strength = "几乎不相关" logger.info(f"相关性分析完成: {file_path}") return { "success": True, "filename": str(file_path), "sheet_name": sheet_name, "data_range1": data_range1, "data_range2": data_range2, "correlation": round(correlation, 4), "strength": strength, "sample_size": n, } except Exception as e: logger.error(f"相关性分析失败: {e}") return {"success": False, "message": f"分析失败: {str(e)}"} def goal_seek( self, filename: str, sheet_name: str, target_cell: str, target_value: float, variable_cell: str, max_iterations: int = 100, tolerance: float = 0.001, ) -> dict[str, Any]: """单变量求解(目标搜索). Args: filename: 文件名 sheet_name: 工作表名称 target_cell: 目标单元格 (如 'D10') target_value: 目标值 variable_cell: 变量单元格 (如 'B5') max_iterations: 最大迭代次数 tolerance: 容差 """ try: file_path = config.paths.output_dir / filename self.file_manager.validate_file_path(file_path, must_exist=True) wb = load_workbook(str(file_path)) if sheet_name not in wb.sheetnames: raise ValueError(f"工作表 '{sheet_name}' 不存在") ws = wb[sheet_name] # 简化的单变量求解 (二分法) variable_value = ws[variable_cell].value or 0 min_val = variable_value - 1000 max_val = variable_value + 1000 for iteration in range(max_iterations): # 尝试中点值 test_value = (min_val + max_val) / 2 ws[variable_cell] = test_value # 重新计算 (注意: openpyxl不能自动计算公式) # 这里只是演示逻辑,实际需要Microsoft Excel API result_value = ws[target_cell].value if result_value is None: raise ValueError(f"目标单元格 {target_cell} 没有公式或值") # 检查是否为公式字符串 if isinstance(result_value, str) and result_value.startswith('='): raise ValueError(f"目标单元格 {target_cell} 包含公式,openpyxl无法自动计算。请使用Excel应用程序或先计算公式值。") # 转换为浮点数 try: result_float = float(result_value) except (ValueError, TypeError): raise ValueError(f"目标单元格 {target_cell} 的值 '{result_value}' 无法转换为数字") diff = abs(result_float - target_value) if diff < tolerance: wb.save(str(file_path)) wb.close() logger.info(f"目标搜索完成: {file_path}") return { "success": True, "message": "目标搜索成功", "filename": str(file_path), "target_cell": target_cell, "target_value": target_value, "variable_cell": variable_cell, "solution": test_value, "iterations": iteration + 1, "final_diff": diff, } # 调整搜索范围 if float(result_value) < target_value: min_val = test_value else: max_val = test_value wb.close() return { "success": False, "message": f"达到最大迭代次数 {max_iterations}, 未找到精确解", "note": "此功能需要Excel应用程序支持以执行公式计算" } except Exception as e: logger.error(f"目标搜索失败: {e}") return {"success": False, "message": f"搜索失败: {str(e)}"} def regression_analysis( self, filename: str, sheet_name: str, x_range: str, y_range: str, regression_type: str = "linear", ) -> dict[str, Any]: """回归分析. Args: filename: 文件名 sheet_name: 工作表名称 x_range: 自变量范围 (如 'A1:A100') y_range: 因变量范围 (如 'B1:B100') regression_type: 回归类型 ('linear'线性, 'polynomial'多项式) """ try: file_path = config.paths.output_dir / filename self.file_manager.validate_file_path(file_path, must_exist=True) wb = load_workbook(str(file_path)) if sheet_name not in wb.sheetnames: raise ValueError(f"工作表 '{sheet_name}' 不存在") ws = wb[sheet_name] # 提取X数据 x_cells = ws[x_range] x_values = [] for row in x_cells: if isinstance(row, tuple): for cell in row: if isinstance(cell.value, (int, float)) and cell.value is not None: x_values.append(cell.value) else: if isinstance(row.value, (int, float)) and row.value is not None: x_values.append(row.value) # 提取Y数据 y_cells = ws[y_range] y_values = [] for row in y_cells: if isinstance(row, tuple): for cell in row: if isinstance(cell.value, (int, float)) and cell.value is not None: y_values.append(cell.value) else: if isinstance(row.value, (int, float)) and row.value is not None: y_values.append(row.value) if len(x_values) != len(y_values): raise ValueError("X和Y数据长度不一致") if len(x_values) < 2: raise ValueError("数据量太少,无法进行回归分析") X = np.array(x_values).reshape(-1, 1) y = np.array(y_values) if regression_type == "linear": # 线性回归 model = LinearRegression() model.fit(X, y) y_pred = model.predict(X) r_squared = model.score(X, y) # 计算标准误差 residuals = y - y_pred mse = np.mean(residuals ** 2) rmse = np.sqrt(mse) result = { "success": True, "regression_type": "linear", "coefficient": float(model.coef_[0]), "intercept": float(model.intercept_), "r_squared": float(r_squared), "rmse": float(rmse), "equation": f"y = {model.coef_[0]:.4f}x + {model.intercept_:.4f}", "sample_size": len(x_values), } elif regression_type == "polynomial": # 多项式回归 (2次) poly = PolynomialFeatures(degree=2) X_poly = poly.fit_transform(X) model = LinearRegression() model.fit(X_poly, y) y_pred = model.predict(X_poly) r_squared = model.score(X_poly, y) residuals = y - y_pred mse = np.mean(residuals ** 2) rmse = np.sqrt(mse) result = { "success": True, "regression_type": "polynomial", "coefficients": [float(c) for c in model.coef_], "intercept": float(model.intercept_), "r_squared": float(r_squared), "rmse": float(rmse), "degree": 2, "sample_size": len(x_values), } else: raise ValueError(f"不支持的回归类型: {regression_type}") logger.info(f"回归分析完成: {file_path}") result["filename"] = str(file_path) result["sheet_name"] = sheet_name return result except Exception as e: logger.error(f"回归分析失败: {e}") return {"success": False, "message": f"分析失败: {str(e)}"} def anova_analysis( self, filename: str, sheet_name: str, *group_ranges: str, ) -> dict[str, Any]: """方差分析 (ANOVA). Args: filename: 文件名 sheet_name: 工作表名称 *group_ranges: 各组数据范围 (如 'A1:A10', 'B1:B10', 'C1:C10') """ try: file_path = config.paths.output_dir / filename self.file_manager.validate_file_path(file_path, must_exist=True) wb = load_workbook(str(file_path)) if sheet_name not in wb.sheetnames: raise ValueError(f"工作表 '{sheet_name}' 不存在") ws = wb[sheet_name] # 提取各组数据 groups = [] for group_range in group_ranges: cells = ws[group_range] values = [] for row in cells: if isinstance(row, tuple): for cell in row: if isinstance(cell.value, (int, float)) and cell.value is not None: values.append(cell.value) else: if isinstance(row.value, (int, float)) and row.value is not None: values.append(row.value) if values: groups.append(values) if len(groups) < 2: raise ValueError("至少需要2组数据进行方差分析") # 执行单因素方差分析 f_statistic, p_value = stats.f_oneway(*groups) # 计算组间和组内统计量 all_values = [v for group in groups for v in group] grand_mean = np.mean(all_values) # 组均值 group_means = [np.mean(group) for group in groups] group_sizes = [len(group) for group in groups] # 判断显著性 alpha = 0.05 significant = p_value < alpha logger.info(f"方差分析完成: {file_path}") return { "success": True, "filename": str(file_path), "sheet_name": sheet_name, "f_statistic": float(f_statistic), "p_value": float(p_value), "significant": significant, "significance_level": alpha, "groups_count": len(groups), "group_sizes": group_sizes, "group_means": [float(m) for m in group_means], "grand_mean": float(grand_mean), "interpretation": "组间存在显著差异" if significant else "组间不存在显著差异", } except Exception as e: logger.error(f"方差分析失败: {e}") return {"success": False, "message": f"分析失败: {str(e)}"} def t_test( self, filename: str, sheet_name: str, group1_range: str, group2_range: str, test_type: str = "independent", ) -> dict[str, Any]: """t检验. Args: filename: 文件名 sheet_name: 工作表名称 group1_range: 第一组数据范围 group2_range: 第二组数据范围 test_type: 检验类型 ('independent'独立样本, 'paired'配对样本) """ try: file_path = config.paths.output_dir / filename self.file_manager.validate_file_path(file_path, must_exist=True) wb = load_workbook(str(file_path)) if sheet_name not in wb.sheetnames: raise ValueError(f"工作表 '{sheet_name}' 不存在") ws = wb[sheet_name] # 提取第一组数据 cells1 = ws[group1_range] values1 = [] for row in cells1: if isinstance(row, tuple): for cell in row: if isinstance(cell.value, (int, float)) and cell.value is not None: values1.append(cell.value) else: if isinstance(row.value, (int, float)) and row.value is not None: values1.append(row.value) # 提取第二组数据 cells2 = ws[group2_range] values2 = [] for row in cells2: if isinstance(row, tuple): for cell in row: if isinstance(cell.value, (int, float)) and cell.value is not None: values2.append(cell.value) else: if isinstance(row.value, (int, float)) and row.value is not None: values2.append(row.value) if not values1 or not values2: raise ValueError("数据不足") if test_type == "independent": # 独立样本t检验 t_statistic, p_value = stats.ttest_ind(values1, values2) elif test_type == "paired": # 配对样本t检验 if len(values1) != len(values2): raise ValueError("配对t检验要求两组数据长度相同") t_statistic, p_value = stats.ttest_rel(values1, values2) else: raise ValueError(f"不支持的检验类型: {test_type}") alpha = 0.05 significant = p_value < alpha logger.info(f"t检验完成: {file_path}") return { "success": True, "filename": str(file_path), "sheet_name": sheet_name, "test_type": test_type, "t_statistic": float(t_statistic), "p_value": float(p_value), "significant": significant, "significance_level": alpha, "group1_mean": float(np.mean(values1)), "group2_mean": float(np.mean(values2)), "group1_size": len(values1), "group2_size": len(values2), "interpretation": "两组存在显著差异" if significant else "两组不存在显著差异", } except Exception as e: logger.error(f"t检验失败: {e}") return {"success": False, "message": f"分析失败: {str(e)}"} def chi_square_test( self, filename: str, sheet_name: str, observed_range: str, ) -> dict[str, Any]: """卡方检验. Args: filename: 文件名 sheet_name: 工作表名称 observed_range: 观测频数范围 (如 'A1:B2') """ try: file_path = config.paths.output_dir / filename self.file_manager.validate_file_path(file_path, must_exist=True) wb = load_workbook(str(file_path)) if sheet_name not in wb.sheetnames: raise ValueError(f"工作表 '{sheet_name}' 不存在") ws = wb[sheet_name] # 提取观测频数 cells = ws[observed_range] observed = [] for row in cells: row_data = [] if isinstance(row, tuple): for cell in row: if isinstance(cell.value, (int, float)) and cell.value is not None: row_data.append(cell.value) else: if isinstance(row.value, (int, float)) and row.value is not None: row_data.append(row.value) if row_data: observed.append(row_data) if not observed: raise ValueError("没有找到有效的观测数据") observed = np.array(observed) # 执行卡方检验 chi2, p_value, dof, expected = stats.chi2_contingency(observed) alpha = 0.05 significant = p_value < alpha logger.info(f"卡方检验完成: {file_path}") return { "success": True, "filename": str(file_path), "sheet_name": sheet_name, "chi_square": float(chi2), "p_value": float(p_value), "degrees_of_freedom": int(dof), "significant": significant, "significance_level": alpha, "observed": observed.tolist(), "expected": expected.tolist(), "interpretation": "存在显著关联" if significant else "不存在显著关联", } except Exception as e: logger.error(f"卡方检验失败: {e}") return {"success": False, "message": f"分析失败: {str(e)}"} def trend_analysis( self, filename: str, sheet_name: str, data_range: str, periods_ahead: int = 5, ) -> dict[str, Any]: """趋势分析和预测. Args: filename: 文件名 sheet_name: 工作表名称 data_range: 数据范围 periods_ahead: 预测未来期数 """ try: file_path = config.paths.output_dir / filename self.file_manager.validate_file_path(file_path, must_exist=True) wb = load_workbook(str(file_path)) if sheet_name not in wb.sheetnames: raise ValueError(f"工作表 '{sheet_name}' 不存在") ws = wb[sheet_name] # 提取数据 cells = ws[data_range] values = [] for row in cells: if isinstance(row, tuple): for cell in row: if isinstance(cell.value, (int, float)) and cell.value is not None: values.append(cell.value) else: if isinstance(row.value, (int, float)) and row.value is not None: values.append(row.value) if len(values) < 3: raise ValueError("数据量太少,无法进行趋势分析") # 使用线性回归进行趋势预测 X = np.arange(len(values)).reshape(-1, 1) y = np.array(values) model = LinearRegression() model.fit(X, y) # 预测未来值 future_X = np.arange(len(values), len(values) + periods_ahead).reshape(-1, 1) predictions = model.predict(future_X) # 计算趋势 slope = model.coef_[0] if slope > 0: trend = "上升" elif slope < 0: trend = "下降" else: trend = "平稳" logger.info(f"趋势分析完成: {file_path}") return { "success": True, "filename": str(file_path), "sheet_name": sheet_name, "trend": trend, "slope": float(slope), "intercept": float(model.intercept_), "r_squared": float(model.score(X, y)), "predictions": [float(p) for p in predictions], "periods_ahead": periods_ahead, } except Exception as e: logger.error(f"趋势分析失败: {e}") return {"success": False, "message": f"分析失败: {str(e)}"} def moving_average( self, filename: str, sheet_name: str, data_range: str, window: int = 3, output_cell: Optional[str] = None, ) -> dict[str, Any]: """移动平均. Args: filename: 文件名 sheet_name: 工作表名称 data_range: 数据范围 window: 移动窗口大小 output_cell: 输出起始单元格 (可选) """ try: file_path = config.paths.output_dir / filename self.file_manager.validate_file_path(file_path, must_exist=True) wb = load_workbook(str(file_path)) if sheet_name not in wb.sheetnames: raise ValueError(f"工作表 '{sheet_name}' 不存在") ws = wb[sheet_name] # 提取数据 cells = ws[data_range] values = [] for row in cells: if isinstance(row, tuple): for cell in row: if isinstance(cell.value, (int, float)) and cell.value is not None: values.append(cell.value) else: if isinstance(row.value, (int, float)) and row.value is not None: values.append(row.value) if len(values) < window: raise ValueError(f"数据量 ({len(values)}) 小于窗口大小 ({window})") # 计算移动平均 series = pd.Series(values) ma = series.rolling(window=window).mean() ma_values = ma.tolist() # 如果指定输出单元格,写入结果 if output_cell: for i, val in enumerate(ma_values): if not pd.isna(val): ws[f"{output_cell[0]}{int(output_cell[1:]) + i}"] = float(val) wb.save(str(file_path)) wb.close() logger.info(f"移动平均计算完成: {file_path}") return { "success": True, "filename": str(file_path), "sheet_name": sheet_name, "window": window, "original_data": values, "moving_average": [float(v) if not pd.isna(v) else None for v in ma_values], } except Exception as e: logger.error(f"移动平均计算失败: {e}") return {"success": False, "message": f"计算失败: {str(e)}"} def exponential_smoothing( self, filename: str, sheet_name: str, data_range: str, alpha: float = 0.3, output_cell: Optional[str] = None, ) -> dict[str, Any]: """指数平滑. Args: filename: 文件名 sheet_name: 工作表名称 data_range: 数据范围 alpha: 平滑系数 (0-1) output_cell: 输出起始单元格 (可选) """ try: if not 0 < alpha < 1: raise ValueError("平滑系数alpha必须在0到1之间") file_path = config.paths.output_dir / filename self.file_manager.validate_file_path(file_path, must_exist=True) wb = load_workbook(str(file_path)) if sheet_name not in wb.sheetnames: raise ValueError(f"工作表 '{sheet_name}' 不存在") ws = wb[sheet_name] # 提取数据 cells = ws[data_range] values = [] for row in cells: if isinstance(row, tuple): for cell in row: if isinstance(cell.value, (int, float)) and cell.value is not None: values.append(cell.value) else: if isinstance(row.value, (int, float)) and row.value is not None: values.append(row.value) if len(values) < 2: raise ValueError("数据量太少") # 计算指数平滑 smoothed = [values[0]] for i in range(1, len(values)): smoothed_value = alpha * values[i] + (1 - alpha) * smoothed[i - 1] smoothed.append(smoothed_value) # 如果指定输出单元格,写入结果 if output_cell: for i, val in enumerate(smoothed): ws[f"{output_cell[0]}{int(output_cell[1:]) + i}"] = float(val) wb.save(str(file_path)) wb.close() logger.info(f"指数平滑计算完成: {file_path}") return { "success": True, "filename": str(file_path), "sheet_name": sheet_name, "alpha": alpha, "original_data": values, "smoothed_data": smoothed, } except Exception as e: logger.error(f"指数平滑计算失败: {e}") return {"success": False, "message": f"计算失败: {str(e)}"}

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/walkingzzzy/office-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server