Skip to main content
Glama
server.py107 kB
from fastmcp import FastMCP #!/usr/bin/env python3 # -*- coding: utf-8 -*- """ ChatExcel MCP Server - 增强版Excel数据处理服务器 提供统一的Excel文件处理、数据分析和可视化功能 """ from typing import Optional # 导入核心模块 try: from core.config import get_config, SecurityConfig, PerformanceConfig from core.exceptions import ( ChatExcelError, FileAccessError, CodeExecutionError, ValidationError, SecurityError, ConfigurationError ) from core.types import ( ExecutionResult, ExecutionStatus, FileInfo, FileType, ExcelMetadata, CodeAnalysis, ProcessingTask, OperationType ) CORE_MODULES_AVAILABLE = True except ImportError as e: print(f"⚠ 核心模块导入失败: {e}") CORE_MODULES_AVAILABLE = False # 统一的依赖管理系统 class DependencyManager: """统一管理可选依赖的导入和状态""" def __init__(self): self.available_modules = {} self.failed_imports = [] self.config = get_config() if CORE_MODULES_AVAILABLE else None self._initialize_dependencies() def _initialize_dependencies(self) -> None: """初始化所有可选依赖""" dependencies = { 'df_processed_error_handler': [ 'VariableLifecycleManager', 'NameErrorHandler', 'enhanced_execute_with_error_handling', 'analyze_code_variables' ], 'column_checker': ['ColumnChecker'], 'excel_helper': ['_suggest_excel_read_parameters', 'detect_excel_structure'], 'excel_smart_tools': [ 'suggest_excel_read_parameters', 'detect_excel_file_structure', 'create_excel_read_template' ], 'comprehensive_data_verification': ['ComprehensiveDataVerifier'], 'data_verification': ['verify_data_processing_result', 'DataVerificationEngine'], 'excel_enhanced_tools': ['ExcelEnhancedProcessor', 'get_excel_processor'], 'formulas_tools': [ 'parse_excel_formula', 'compile_excel_workbook', 'execute_excel_formula', 'analyze_excel_dependencies', 'validate_excel_formula' ], 'excel_data_quality_tools': [ 'ExcelDataQualityController', 'ExcelCellContentExtractor', 'ExcelCharacterConverter', 'ExcelMultiConditionExtractor', 'ExcelMultiTableMerger', 'ExcelDataCleaner', 'ExcelBatchProcessor' ] } for module_name, imports in dependencies.items(): self._import_module(module_name, imports) def _import_module(self, module_name: str, imports: list) -> None: """安全导入单个模块""" try: module = __import__(module_name, fromlist=imports) imported_items = {} for item_name in imports: if hasattr(module, item_name): imported_items[item_name] = getattr(module, item_name) if imported_items: self.available_modules[module_name] = imported_items print(f"✓ 成功导入模块: {module_name}") else: self.failed_imports.append(module_name) print(f"⚠ 模块 {module_name} 导入失败: 未找到指定项") except ImportError: self.failed_imports.append(module_name) print(f"⚠ 模块 {module_name} 导入失败: ImportError") except Exception as e: self.failed_imports.append(module_name) print(f"⚠ 模块 {module_name} 导入时发生错误: {str(e)}") def get_module_item(self, module_name: str, item_name: str, default=None): """获取模块中的特定项""" if module_name in self.available_modules: return self.available_modules[module_name].get(item_name, default) return default def is_available(self, module_name: str) -> bool: """检查模块是否可用""" return module_name in self.available_modules # 初始化依赖管理器 dependency_manager = DependencyManager() # 标准库导入 import pandas as pd import numpy as np import matplotlib.pyplot as plt import seaborn as sns import plotly.express as px import plotly.graph_objects as go from plotly.subplots import make_subplots import io import base64 import os from chardet import detect import traceback from io import StringIO import sys import time from textwrap import wrap import json from pathlib import Path from typing import Dict, Any, List, Optional, Union, Tuple import openpyxl from openpyxl.styles import Font, PatternFill, Alignment from openpyxl.utils.dataframe import dataframe_to_rows import logging from enum import Enum # 通过依赖管理器获取可选模块 # Excel助手工具 _suggest_excel_read_parameters = dependency_manager.get_module_item('excel_helper', '_suggest_excel_read_parameters') detect_excel_structure = dependency_manager.get_module_item('excel_helper', 'detect_excel_structure') # Excel智能工具 suggest_excel_read_parameters = dependency_manager.get_module_item('excel_smart_tools', 'suggest_excel_read_parameters') detect_excel_file_structure = dependency_manager.get_module_item('excel_smart_tools', 'detect_excel_file_structure') create_excel_read_template = dependency_manager.get_module_item('excel_smart_tools', 'create_excel_read_template') # 数据验证工具 ComprehensiveDataVerifier = dependency_manager.get_module_item('comprehensive_data_verification', 'ComprehensiveDataVerifier') verify_data_processing_result = dependency_manager.get_module_item('data_verification', 'verify_data_processing_result') DataVerificationEngine = dependency_manager.get_module_item('data_verification', 'DataVerificationEngine') # Excel增强工具 ExcelEnhancedProcessor = dependency_manager.get_module_item('excel_enhanced_tools', 'ExcelEnhancedProcessor') get_excel_processor = dependency_manager.get_module_item('excel_enhanced_tools', 'get_excel_processor') # Excel公式处理工具 parse_excel_formula = dependency_manager.get_module_item('formulas_tools', 'parse_excel_formula') compile_excel_workbook = dependency_manager.get_module_item('formulas_tools', 'compile_excel_workbook') execute_excel_formula = dependency_manager.get_module_item('formulas_tools', 'execute_excel_formula') analyze_excel_dependencies = dependency_manager.get_module_item('formulas_tools', 'analyze_excel_dependencies') validate_excel_formula = dependency_manager.get_module_item('formulas_tools', 'validate_excel_formula') # Excel数据质量工具 ExcelDataQualityController = dependency_manager.get_module_item('excel_data_quality_tools', 'ExcelDataQualityController') ExcelCellContentExtractor = dependency_manager.get_module_item('excel_data_quality_tools', 'ExcelCellContentExtractor') ExcelCharacterConverter = dependency_manager.get_module_item('excel_data_quality_tools', 'ExcelCharacterConverter') ExcelMultiConditionExtractor = dependency_manager.get_module_item('excel_data_quality_tools', 'ExcelMultiConditionExtractor') ExcelMultiTableMerger = dependency_manager.get_module_item('excel_data_quality_tools', 'ExcelMultiTableMerger') ExcelDataCleaner = dependency_manager.get_module_item('excel_data_quality_tools', 'ExcelDataCleaner') ExcelBatchProcessor = dependency_manager.get_module_item('excel_data_quality_tools', 'ExcelBatchProcessor') # 错误处理工具 VariableLifecycleManager = dependency_manager.get_module_item('df_processed_error_handler', 'VariableLifecycleManager') NameErrorHandler = dependency_manager.get_module_item('df_processed_error_handler', 'NameErrorHandler') enhanced_execute_with_error_handling = dependency_manager.get_module_item('df_processed_error_handler', 'enhanced_execute_with_error_handling') analyze_code_variables = dependency_manager.get_module_item('df_processed_error_handler', 'analyze_code_variables') # 列检查器 ColumnChecker = dependency_manager.get_module_item('column_checker', 'ColumnChecker') from security.code_executor import execute_code_safely # from enhanced_excel_helper import smart_read_excel, detect_file_encoding, validate_excel_data_integrity # 使用excel_helper中的功能替代 from excel_helper import _suggest_excel_read_parameters, detect_excel_structure # 标准化错误处理机制 class ErrorType(Enum): """错误类型枚举""" FILE_NOT_FOUND = "file_not_found" PERMISSION_DENIED = "permission_denied" INVALID_FORMAT = "invalid_format" PROCESSING_ERROR = "processing_error" DEPENDENCY_ERROR = "dependency_error" VALIDATION_ERROR = "validation_error" EXECUTION_ERROR = "execution_error" MODULE_NOT_FOUND = "module_not_found" UNKNOWN_ERROR = "unknown_error" class StandardErrorHandler: """标准化错误处理器""" @staticmethod def create_error_response(error_type: ErrorType, message: str, details: Optional[Dict[str, Any]] = None, suggestions: Optional[List[str]] = None) -> Dict[str, Any]: """创建标准化错误响应""" response = { "success": False, "error_type": error_type.value, "message": message, "timestamp": time.time() } if details: response["details"] = details if suggestions: response["suggestions"] = suggestions return response @staticmethod def create_success_response(data: Any, message: str = "操作成功") -> Dict[str, Any]: """创建标准化成功响应""" return { "success": True, "message": message, "data": data, "timestamp": time.time() } @staticmethod def handle_exception(e: Exception, context: str = "") -> Dict[str, Any]: """统一异常处理""" error_mapping = { FileNotFoundError: ErrorType.FILE_NOT_FOUND, PermissionError: ErrorType.PERMISSION_DENIED, ValueError: ErrorType.INVALID_FORMAT, ImportError: ErrorType.DEPENDENCY_ERROR, } error_type = error_mapping.get(type(e), ErrorType.UNKNOWN_ERROR) details = { "exception_type": type(e).__name__, "traceback": traceback.format_exc(), "context": context } return StandardErrorHandler.create_error_response( error_type, str(e), details ) # 初始化全局错误处理器 error_handler = StandardErrorHandler() # 统一常量定义 MAX_FILE_SIZE = 999999999999 # 无限制文件大小 BLACKLIST = [] # 完全清空黑名单,允许所有操作包括subprocess等 TEMPLATE_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "templates") CHARTS_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "charts") # 安全初始化数据质量工具实例 def initialize_data_quality_tools() -> Dict[str, Any]: """安全初始化数据质量工具""" tools = {} if ExcelDataQualityController: try: tools['data_quality_controller'] = ExcelDataQualityController() except Exception as e: logging.warning(f"初始化ExcelDataQualityController失败: {e}") if ExcelCellContentExtractor: try: tools['cell_content_extractor'] = ExcelCellContentExtractor() except Exception as e: logging.warning(f"初始化ExcelCellContentExtractor失败: {e}") if ExcelCharacterConverter: try: tools['character_converter'] = ExcelCharacterConverter() except Exception as e: logging.warning(f"初始化ExcelCharacterConverter失败: {e}") if ExcelMultiConditionExtractor: try: tools['multi_condition_extractor'] = ExcelMultiConditionExtractor() except Exception as e: logging.warning(f"初始化ExcelMultiConditionExtractor失败: {e}") if ExcelMultiTableMerger: try: tools['multi_table_merger'] = ExcelMultiTableMerger() except Exception as e: logging.warning(f"初始化ExcelMultiTableMerger失败: {e}") if ExcelDataCleaner: try: tools['data_cleaner'] = ExcelDataCleaner() except Exception as e: logging.warning(f"初始化ExcelDataCleaner失败: {e}") if ExcelBatchProcessor: try: tools['batch_processor'] = ExcelBatchProcessor() except Exception as e: logging.warning(f"初始化ExcelBatchProcessor失败: {e}") return tools # 初始化数据质量工具 data_quality_tools = initialize_data_quality_tools() # 向后兼容的工具实例访问 data_quality_controller = data_quality_tools.get('data_quality_controller') cell_content_extractor = data_quality_tools.get('cell_content_extractor') character_converter = data_quality_tools.get('character_converter') multi_condition_extractor = data_quality_tools.get('multi_condition_extractor') multi_table_merger = data_quality_tools.get('multi_table_merger') data_cleaner = data_quality_tools.get('data_cleaner') batch_processor = data_quality_tools.get('batch_processor') mcp = FastMCP("chatExcel") # 配置日志 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.StreamHandler(), logging.FileHandler('chatexcel_mcp.log') ] ) logger = logging.getLogger(__name__) # 向后兼容的工具函数(使用新的标准化错误处理) def create_error_response(error_type: str, message: str, details: dict = None, solutions: list = None) -> dict: """创建统一格式的错误响应(向后兼容版本) Args: error_type: 错误类型 message: 错误消息 details: 错误详情 solutions: 解决方案建议 Returns: dict: 标准化错误响应 """ # 映射旧的错误类型到新的枚举 error_type_mapping = { "FILE_NOT_FOUND": ErrorType.FILE_NOT_FOUND, "PERMISSION_DENIED": ErrorType.PERMISSION_DENIED, "INVALID_FORMAT": ErrorType.INVALID_FORMAT, "PROCESSING_ERROR": ErrorType.PROCESSING_ERROR, "DEPENDENCY_ERROR": ErrorType.DEPENDENCY_ERROR, "VALIDATION_ERROR": ErrorType.VALIDATION_ERROR, } mapped_error_type = error_type_mapping.get(error_type, ErrorType.UNKNOWN_ERROR) response = StandardErrorHandler.create_error_response( mapped_error_type, message, details ) # 添加解决方案(如果提供) if solutions: response["solutions"] = solutions return response def create_success_response(data: dict, message: str = "操作成功完成") -> dict: """创建统一格式的成功响应(已弃用,请使用error_handler.create_success_response) Args: data: 响应数据 message: 成功消息 Returns: dict: 标准化成功响应 """ global error_handler return error_handler.create_success_response(data, message) def get_template_path(template_name: str) -> str: """获取模板文件的绝对路径 Args: template_name: 模板文件名 Returns: str: 模板文件绝对路径 Raises: FileNotFoundError: 模板文件不存在 """ template_path = os.path.join(TEMPLATE_DIR, template_name) if not os.path.exists(template_path): raise FileNotFoundError(f"模板文件未找到: {template_path}") return template_path def validate_file_access(file_path: str) -> dict: """验证文件访问权限和大小(宽松模式) Args: file_path: 文件路径 Returns: dict: 验证结果,包含status和相关信息 """ # 宽松模式:即使文件不存在也允许执行(可能是要创建新文件) if not os.path.exists(file_path): logger.warning(f"文件不存在,但允许执行: {file_path}") return error_handler.create_success_response( {"file_size": 0, "note": "文件不存在,可能会创建新文件"}, message="文件访问验证通过" ) file_size = os.path.getsize(file_path) # 移除文件大小限制 logger.debug(f"文件大小: {file_size / (1024*1024):.1f}MB") return error_handler.create_success_response( {"file_size": file_size}, message="文件访问验证通过" ) import logging # 配置日志 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler('chatExcel.log'), logging.StreamHandler() ] ) logger = logging.getLogger(__name__) @mcp.tool() def read_metadata(file_path: str) -> dict: """读取CSV文件元数据并返回MCP兼容格式 Args: file_path: CSV文件的绝对路径 Returns: dict: 包含列信息、文件信息和状态的结构化元数据 """ logger.info(f"开始读取文件元数据: {file_path}") try: # 验证文件访问 validation_result = validate_file_access(file_path) if not validation_result.get("success", False): return { "status": "ERROR", "error": "FILE_ACCESS_FAILED", "message": validation_result.get("message", "文件访问验证失败"), "details": validation_result } # Detect encoding and delimiter with open(file_path, 'rb') as f: rawdata = f.read(50000) enc = detect(rawdata)['encoding'] or 'utf-8' with open(file_path, 'r', encoding=enc) as f: first_line = f.readline() delimiter = ',' if ',' in first_line else '\t' if '\t' in first_line else ';' # 获取文件大小 file_size = validation_result.get("data", {}).get("file_size", 0) # Get actual row count efficiently try: # Count total rows without loading data total_rows = sum(1 for _ in open(file_path, 'r', encoding=enc)) - 1 # -1 for header except Exception: # Fallback: use pandas to count rows # Read only first column to count rows efficiently temp_df = pd.read_csv(file_path, encoding=enc, delimiter=delimiter, usecols=[0]) total_rows = len(temp_df) # Check file size limit if file_size > MAX_FILE_SIZE: return { "status": "ERROR", "error": "FILE_TOO_LARGE", "max_size": f"{MAX_FILE_SIZE / 1024 / 1024}MB", "actual_size": f"{file_size / 1024 / 1024:.1f}MB" } # Read sample data for metadata analysis sample_size = min(100, total_rows) df = pd.read_csv(file_path, encoding=enc, delimiter=delimiter, nrows=sample_size) # Calculate additional metadata columns_metadata = [] for col in df.columns: col_meta = { "name": col, "type": str(df[col].dtype), "sample": df[col].dropna().iloc[:2].tolist(), "stats": { "null_count": df[col].isnull().sum(), "unique_count": df[col].nunique(), "is_numeric": pd.api.types.is_numeric_dtype(df[col]) }, "warnings": [], "suggested_operations": [] } # Add numeric stats if applicable if pd.api.types.is_numeric_dtype(df[col]): col_meta["stats"].update({ "min": df[col].min(), "max": df[col].max(), "mean": df[col].mean(), "std": df[col].std() }) col_meta["suggested_operations"].extend([ "normalize", "scale", "log_transform" ]) # Add categorical stats if applicable if pd.api.types.is_string_dtype(df[col]): col_meta["suggested_operations"].extend([ "one_hot_encode", "label_encode", "text_processing" ]) # Add datetime detection if pd.api.types.is_datetime64_any_dtype(df[col]): col_meta["suggested_operations"].extend([ "extract_year", "extract_month", "time_delta" ]) # Add warnings if df[col].isnull().sum() > 0: col_meta["warnings"].append(f"{df[col].isnull().sum()} null values found") if df[col].nunique() == 1: col_meta["warnings"].append("Column contains only one unique value") if pd.api.types.is_numeric_dtype(df[col]) and df[col].abs().max() > 1e6: col_meta["warnings"].append("Large numeric values detected - consider scaling") columns_metadata.append(col_meta) from pandas.api.types import infer_dtype # Format concise response summary = { "status": "SUCCESS", "file_info": { "size": f"{file_size / 1024:.1f}KB", "encoding": enc, "delimiter": delimiter }, "dataset": { "total_rows": total_rows, "sample_rows": len(df), "columns": len(df.columns), "column_types": { col: infer_dtype(df[col]) for col in df.columns } }, "warnings": { "message": "Data quality issues detected" if ( df.isnull().any().any() or df.duplicated().any() or (df.nunique() == 1).any() ) else "No significant data quality issues found", **({ "null_columns": { "count": sum(df.isnull().any()), "columns": [col for col in df.columns if df[col].isnull().any()] } } if sum(df.isnull().any()) > 0 else {}), **({ "total_nulls": df.isnull().sum().sum() } if df.isnull().sum().sum() > 0 else {}), **({ "duplicate_rows": { "count": df.duplicated().sum(), "rows": df[df.duplicated()].index.tolist() } } if df.duplicated().sum() > 0 else {}), **({ "single_value_columns": { "count": sum(df.nunique() == 1), "columns": [col for col in df.columns if df[col].nunique() == 1] } } if sum(df.nunique() == 1) > 0 else {}) } } return summary except Exception as e: return { "status": "ERROR", "error_type": type(e).__name__, "message": str(e), "solution": [ "Check if the file is being used by another program", "Try saving the file as UTF-8 encoded CSV", "Contact the administrator to check MCP file access permissions" ], "traceback": traceback.format_exc() } @mcp.tool() def verify_data_integrity(original_file: str, processed_data: Optional[str] = None, comparison_file: Optional[str] = None, verification_type: str = "basic") -> dict: """数据完整性验证和比对核准工具。 Args: original_file: 原始Excel文件路径 processed_data: 处理后的数据(JSON字符串格式)或文件路径 comparison_file: 用于比较的另一个Excel文件路径(可选) verification_type: 验证类型 ("basic", "detailed", "statistical") Returns: dict: 验证结果报告 """ try: # from data_verification import DataVerificationEngine, verify_data_processing_result # 使用内置验证功能 # 验证原始文件访问 validation_result = validate_file_access(original_file) if not validation_result.get("success", False): return { "success": False, "error": validation_result.get("message", "文件访问失败"), "suggestion": "请确保原始文件路径正确且文件存在。" } # 创建验证引擎 verifier = DataVerificationEngine() if comparison_file: # 文件对比模式 validation_result2 = validate_file_access(comparison_file) if not validation_result2.get("success", False): return { "success": False, "error": f"比较文件访问失败: {validation_result2.get('message', '未知错误')}", "suggestion": "请确保比较文件路径正确且文件存在。" } # 读取两个文件进行比较 df1 = pd.read_excel(original_file) df2 = pd.read_excel(comparison_file) comparison_result = verifier.compare_dataframes( df1, df2, name1=os.path.basename(original_file), name2=os.path.basename(comparison_file) ) # 添加增强的数据质量检查 quality_result1 = data_quality_controller.comprehensive_quality_check( original_file, "comprehensive" ) quality_result2 = data_quality_controller.comprehensive_quality_check( comparison_file, "comprehensive" ) comparison_result["enhanced_quality_analysis"] = { "original_file_quality": quality_result1, "comparison_file_quality": quality_result2, "quality_comparison": { "score_difference": quality_result1.get("overall_score", 0) - quality_result2.get("overall_score", 0), "better_quality_file": "original" if quality_result1.get("overall_score", 0) > quality_result2.get("overall_score", 0) else "comparison" } } return { "success": True, "verification_type": "file_comparison", "comparison_result": comparison_result, "summary": { "files_compared": [original_file, comparison_file], "match_score": comparison_result.get("match_score", 0), "has_differences": comparison_result.get("has_differences", True) } } elif processed_data: # 数据处理结果验证模式 try: # 尝试解析处理后的数据 if isinstance(processed_data, str): if processed_data.endswith(('.xlsx', '.xls', '.csv')): # 如果是文件路径 if processed_data.endswith('.csv'): processed_df = pd.read_csv(processed_data) else: processed_df = pd.read_excel(processed_data) else: # 如果是JSON字符串 import json data_dict = json.loads(processed_data) processed_df = pd.DataFrame(data_dict) else: return { "success": False, "error": "处理后数据格式不支持", "suggestion": "请提供JSON字符串、CSV或Excel文件路径。" } # 执行验证 verification_result = verify_data_processing_result( original_file, processed_df, verification_type ) return { "success": True, "verification_type": "processing_verification", "verification_result": verification_result } except Exception as parse_error: return { "success": False, "error": f"数据解析失败: {str(parse_error)}", "suggestion": "请检查处理后数据的格式是否正确。" } else: # 基础完整性检查模式 df = pd.read_excel(original_file) # 执行基础完整性检查 integrity_report = { "file_info": { "path": original_file, "shape": df.shape, "columns": list(df.columns), "dtypes": df.dtypes.to_dict() }, "data_quality": { "total_rows": len(df), "total_columns": len(df.columns), "null_counts": df.isnull().sum().to_dict(), "null_percentage": (df.isnull().sum() / len(df) * 100).to_dict(), "duplicate_rows": df.duplicated().sum(), "memory_usage": df.memory_usage(deep=True).to_dict() }, "column_analysis": {} } # 详细列分析 for col in df.columns: col_info = { "dtype": str(df[col].dtype), "null_count": df[col].isnull().sum(), "unique_count": df[col].nunique(), "sample_values": df[col].dropna().head(3).tolist() } if df[col].dtype in ['int64', 'float64']: col_info.update({ "min": df[col].min(), "max": df[col].max(), "mean": df[col].mean(), "std": df[col].std() }) integrity_report["column_analysis"][col] = col_info return { "success": True, "verification_type": "integrity_check", "integrity_report": integrity_report } except Exception as e: return { "success": False, "error": { "type": type(e).__name__, "message": str(e), "traceback": traceback.format_exc() }, "suggestion": "请检查输入参数和文件格式是否正确。" } @mcp.tool() def read_excel_metadata(file_path: str) -> dict: """增强版Excel文件元数据读取,支持智能编码检测和完整性验证。 Args: file_path: Excel文件路径 Returns: dict: 包含文件元数据、编码信息、完整性验证结果的字典 """ try: # 验证文件访问 validation_result = validate_file_access(file_path) if not validation_result.get("success", False): return validation_result # 智能编码检测 from utils.encoding_detector import EncodingDetector encoding, confidence = EncodingDetector.detect_file_encoding(file_path) encoding_info = {'encoding': encoding, 'confidence': confidence} # 使用智能读取功能 try: from services.excel_service import ExcelService excel_service = ExcelService() df = excel_service.smart_read_excel(file_path) read_params = {} except Exception as e: return { "status": "ERROR", "error": "SMART_READ_FAILED", "message": f"智能读取失败: {str(e)}", "encoding_info": encoding_info } # 数据完整性验证 (暂时禁用,函数已删除) # integrity_result = validate_excel_data_integrity(file_path, df) integrity_result = {'status': 'success', 'message': '数据完整性验证跳过'} # 增强的数据质量检查 (使用性能优化版本) quality_result = data_quality_controller.comprehensive_quality_check( file_path, "standard" # 使用 standard 级别以提升性能 ) # 高级单元格内容分析 (使用性能优化增强版本) cell_analysis = cell_content_extractor.extract_cell_content_advanced( file_path=file_path, cell_range="A1:Z1000", # 使用优化后的扩展范围 extract_type="all", # 保持智能提取类型,支持完整分析 max_cells=26000 # 调整为默认限制值26000个单元格 ) # 获取所有工作表信息 workbook = openpyxl.load_workbook(file_path, read_only=True) sheet_names = workbook.sheetnames sheets_info = {} for sheet_name in sheet_names: sheet_obj = workbook[sheet_name] sheets_info[sheet_name] = { 'max_row': sheet_obj.max_row, 'max_column': sheet_obj.max_column, 'has_data': sheet_obj.max_row > 1 } workbook.close() # 智能参数推荐(使用增强版) suggested_params = _suggest_excel_read_parameters(file_path, read_params.get('sheet_name')) if _suggest_excel_read_parameters else {} # 使用通用列分析函数 columns_metadata = _analyze_dataframe_columns(df) summary = { "status": "SUCCESS", "file_info": { "size": f"{validation_result.get('data', {}).get('file_size', 0) / 1024:.1f}KB", "sheets": sheet_names, "sheets_info": sheets_info, "encoding": encoding_info }, "dataset": { "total_rows": read_result.get('total_rows', len(df)), "sample_rows": len(df), "columns": len(df.columns), "column_types": { col: str(df[col].dtype) for col in df.columns } }, "read_params": read_params, "suggested_params": suggested_params, "columns_metadata": columns_metadata, "integrity_check": integrity_result, "enhanced_quality_analysis": quality_result, "cell_content_analysis": cell_analysis, "warnings": { "message": "Data quality issues detected" if ( df.isnull().any().any() or df.duplicated().any() or (df.nunique() == 1).any() ) else "No significant data quality issues found", **({ "null_columns": { "count": sum(df.isnull().any()), "columns": [col for col in df.columns if df[col].isnull().any()] } } if sum(df.isnull().any()) > 0 else {}), **({ "total_nulls": df.isnull().sum().sum() } if df.isnull().sum().sum() > 0 else {}), **({ "duplicate_rows": { "count": df.duplicated().sum(), "rows": df[df.duplicated()].index.tolist() } } if df.duplicated().sum() > 0 else {}), **({ "single_value_columns": { "count": sum(df.nunique() == 1), "columns": [col for col in df.columns if df[col].nunique() == 1] } } if sum(df.nunique() == 1) > 0 else {}) } } return summary except Exception as e: return { "status": "ERROR", "error_type": type(e).__name__, "message": str(e), "solution": [ "Ensure the file is a valid Excel file (.xlsx)", "Check if the file is being used by another program", "Contact the administrator to check MCP file access permissions" ], "traceback": traceback.format_exc() } def _read_csv_with_smart_encoding(file_path: str): """优化的CSV文件读取函数,使用智能编码检测和缓存 Args: file_path: CSV文件路径 Returns: pandas.DataFrame or None: 读取的DataFrame或None(如果失败) """ # 使用标准库pandas import pandas as pd try: # 首先尝试检测文件编码 from utils.encoding_detector import EncodingDetector encoding, confidence = EncodingDetector.detect_file_encoding(file_path) detected_encoding = encoding # 尝试使用检测到的编码读取CSV文件 try: return pd.read_csv(file_path, encoding=detected_encoding) except UnicodeDecodeError: # 如果检测到的编码失败,尝试常见编码 common_encodings = ['utf-8', 'gbk', 'gb2312', 'gb18030', 'big5', 'latin1', 'cp1252'] for enc in common_encodings: try: return pd.read_csv(file_path, encoding=enc) except (UnicodeDecodeError, UnicodeError): continue return None except Exception as e: logger.error(f"Failed to read CSV file {file_path}: {str(e)}") return None def _execute_code_safely(code: str, df, file_path: Optional[str] = None) -> dict: """安全执行代码的通用函数 Args: code: 要执行的代码字符串 df: pandas DataFrame Returns: dict: 执行结果 """ import sys from io import StringIO # 使用标准库pandas import pandas as pd # 捕获标准输出 old_stdout = sys.stdout sys.stdout = captured_output = StringIO() try: # 尝试使用安全代码执行器 SecureCodeExecutor = dependency_manager.get_module_item('security.secure_code_executor', 'SecureCodeExecutor') if SecureCodeExecutor: executor = SecureCodeExecutor( max_memory_mb=256, max_execution_time=30, enable_ast_analysis=False ) context = {'df': df, 'pd': pd, 'file_path': file_path} execution_result = executor.execute_code(code, context) if not execution_result['success']: return { 'success': False, 'error': execution_result.get('error', 'Unknown error'), 'output': execution_result.get('output', captured_output.getvalue()), 'suggestions': execution_result.get('suggestions', []) } # 处理执行结果 result_data = _format_execution_result(execution_result.get('result'), pd) return { 'success': True, 'output': execution_result.get('output', captured_output.getvalue()), 'result': result_data } else: # 简单的代码执行(如果没有安全执行器) context = {'df': df, 'pd': pd, 'file_path': file_path} exec(code, context) result_data = _format_execution_result(context.get('result'), pd) return { 'success': True, 'output': captured_output.getvalue(), 'result': result_data } except Exception as e: return { 'success': False, 'error': str(e), 'output': captured_output.getvalue(), 'suggestions': [ 'Check code syntax', 'Ensure operations are valid for the data' ] } finally: sys.stdout = old_stdout def _format_execution_result(result, pd) -> dict: """格式化执行结果 Args: result: 执行结果 pd: pandas模块 Returns: dict: 格式化的结果 """ if result is None: return None if pd and isinstance(result, pd.DataFrame): return { "type": "DataFrame", "shape": result.shape, "columns": result.columns.tolist(), "data": result.head(10).to_dict('records'), "dtypes": result.dtypes.astype(str).to_dict() } elif pd and isinstance(result, pd.Series): return { "type": "Series", "name": result.name, "length": len(result), "data": result.head(10).tolist(), "dtype": str(result.dtype) } else: return { "type": type(result).__name__, "value": str(result) } def _analyze_dataframe_columns(df) -> list: """通用DataFrame列分析函数 Args: df: pandas DataFrame Returns: list: 列元数据列表 """ # 使用标准库pandas import pandas as pd columns_metadata = [] for col in df.columns: col_meta = { "name": col, "type": str(df[col].dtype), "sample": df[col].dropna().iloc[:2].tolist() if len(df[col].dropna()) > 0 else [], "stats": { "null_count": int(df[col].isnull().sum()), "unique_count": int(df[col].nunique()), "is_numeric": pd.api.types.is_numeric_dtype(df[col]) }, "warnings": [], "suggested_operations": [] } # 数值类型分析 if pd.api.types.is_numeric_dtype(df[col]): col_meta["stats"].update({ "min": float(df[col].min()) if not df[col].empty else None, "max": float(df[col].max()) if not df[col].empty else None, "mean": float(df[col].mean()) if not df[col].empty else None, "std": float(df[col].std()) if not df[col].empty else None }) col_meta["suggested_operations"].extend([ "normalize", "scale", "log_transform" ]) # 字符串类型分析 if pd.api.types.is_string_dtype(df[col]): col_meta["suggested_operations"].extend([ "one_hot_encode", "label_encode", "text_processing" ]) # 日期时间类型分析 if pd.api.types.is_datetime64_any_dtype(df[col]): col_meta["suggested_operations"].extend([ "extract_year", "extract_month", "time_delta" ]) # 数据质量警告 if df[col].isnull().sum() > 0: col_meta["warnings"].append(f"{df[col].isnull().sum()} null values found") if df[col].nunique() == 1: col_meta["warnings"].append("Column contains only one unique value") if pd.api.types.is_numeric_dtype(df[col]) and not df[col].empty and df[col].abs().max() > 1e6: col_meta["warnings"].append("Large numeric values detected - consider scaling") columns_metadata.append(col_meta) return columns_metadata def smart_column_matcher(target_column: str, available_columns: list) -> dict: """智能列名匹配工具 Args: target_column: 目标列名 available_columns: 可用的列名列表 Returns: dict: 匹配结果和建议 """ import difflib import re result = { 'exact_match': None, 'close_matches': [], 'suggestions': [], 'normalized_matches': [] } # 1. 精确匹配 if target_column in available_columns: result['exact_match'] = target_column return result # 2. 大小写不敏感匹配 target_lower = target_column.lower() for col in available_columns: if col.lower() == target_lower: result['exact_match'] = col result['suggestions'].append(f"找到大小写不同的匹配: '{col}'") return result # 3. 去除空格和特殊字符后匹配 target_normalized = re.sub(r'[\s_-]', '', target_column.lower()) for col in available_columns: col_normalized = re.sub(r'[\s_-]', '', col.lower()) if col_normalized == target_normalized: result['normalized_matches'].append(col) # 4. 模糊匹配 close_matches = difflib.get_close_matches( target_column, available_columns, n=5, cutoff=0.6 ) result['close_matches'] = close_matches # 5. 中文列名变体匹配 chinese_variants = { '消耗日期': ['消费日期', '使用日期', '支出日期', '花费日期', '消耗时间', '消费时间'], '消费日期': ['消耗日期', '使用日期', '支出日期', '花费日期', '消费时间', '消耗时间'], '日期': ['时间', 'Date', 'date', '创建日期', '更新日期', '记录日期'], '金额': ['数量', '价格', '费用', '成本', 'Amount', 'amount', '总额'], '名称': ['姓名', '品名', '项目', 'Name', 'name', '标题'], '类型': ['分类', '种类', 'Type', 'type', '类别'] } if target_column in chinese_variants: for variant in chinese_variants[target_column]: if variant in available_columns: result['suggestions'].append(f"发现相似列名: '{variant}'") return result @mcp.tool() def run_excel_code( file_path: str, code: str, sheet_name: str | None = None, skiprows: int | None = None, header: int | None = None, usecols: str | None = None, encoding: str | None = None, auto_detect: bool = True, allow_file_write: bool = False ) -> dict: """ 增强版Excel代码执行工具,具备强化的pandas导入和错误处理机制。 Args: code: 要执行的数据处理代码字符串 file_path: Excel文件路径 sheet_name: 可选,工作表名称 skiprows: 可选,跳过的行数 header: 可选,用作列名的行号 usecols: 可选,要解析的列 encoding: 指定编码(可选) auto_detect: 是否启用智能检测和参数优化 allow_file_write: 是否允许在代码中写入文件 Returns: dict: 执行结果或错误信息 """ global error_handler start_time = time.time() try: # 使用核心模块的异常处理 if CORE_MODULES_AVAILABLE: config = get_config() security_config = config.security # 检查文件大小限制 if os.path.exists(file_path): file_size = os.path.getsize(file_path) if file_size > security_config.max_file_size: raise FileAccessError( file_path=file_path, reason=f"文件大小 {file_size} 字节超过限制 {security_config.max_file_size} 字节", error_code="FILE_TOO_LARGE", suggestions=["请使用较小的文件或联系管理员调整限制"] ) # 验证文件访问权限 validation_result = validate_file_access(file_path) if not validation_result.get("success", False): if CORE_MODULES_AVAILABLE: raise FileAccessError( file_path=file_path, reason=validation_result.get("message", "文件访问验证失败"), error_code="ACCESS_DENIED", suggestions=["请检查文件路径和权限"] ) return validation_result # 智能读取Excel文件 read_params = { 'sheet_name': sheet_name, 'skiprows': skiprows, 'header': header, 'usecols': usecols } # 移除None值参数 read_params = {k: v for k, v in read_params.items() if v is not None} # 使用pandas读取Excel文件 df = pd.read_excel(file_path, **read_params) # 记录文件信息 if CORE_MODULES_AVAILABLE: file_info = FileInfo( path=file_path, name=os.path.basename(file_path), size=os.path.getsize(file_path), type=FileType.EXCEL, encoding=encoding ) # 使用增强的安全代码执行器(支持__import__) if allow_file_write: # 如果允许文件写入,使用更宽松的执行环境 execution_result = _execute_code_safely(code, df, file_path) else: # 标准执行环境 execution_result = _execute_code_safely(code, df, file_path) # 增强执行结果 if CORE_MODULES_AVAILABLE and isinstance(execution_result, dict): execution_time = time.time() - start_time execution_result.update({ "execution_time": execution_time, "file_info": file_info.to_dict() if 'file_info' in locals() else None, "parameters_used": read_params }) return execution_result except FileNotFoundError as e: if CORE_MODULES_AVAILABLE: error = FileAccessError( file_path=file_path, reason="文件未找到", error_code="FILE_NOT_FOUND", suggestions=["请确认文件路径是否正确", "检查文件是否存在"] ) return error.to_dict() return error_handler.create_error_response( ErrorType.FILE_NOT_FOUND, f"文件未找到: {file_path}", suggestions=["请确认文件路径是否正确。"] ) except Exception as e: execution_time = time.time() - start_time if CORE_MODULES_AVAILABLE: if isinstance(e, ChatExcelError): result = e.to_dict() result["execution_time"] = execution_time return result else: error = CodeExecutionError( code=code, error_details=f"执行Excel代码时出错: {str(e)}", error_code="EXECUTION_FAILED", details={"exception_type": type(e).__name__, "traceback": traceback.format_exc()}, suggestions=["检查代码语法", "确认数据格式正确"] ) result = error.to_dict() result["execution_time"] = execution_time return result logger.error(f"执行Excel代码时出错: {e}", exc_info=True) return error_handler.handle_exception(e, context="run_excel_code") @mcp.tool() def run_code(code: str, file_path: str) -> dict: """在CSV文件上执行数据处理代码,具备安全检查功能。 Args: code: 要执行的数据处理代码字符串。 file_path: CSV文件路径。 Returns: dict: 执行结果,包含数据、输出或错误信息。 """ global error_handler try: # 验证文件访问 validation_result = validate_file_access(file_path) if not validation_result.get("success", False): return error_handler.create_error_response( ErrorType.FILE_ACCESS_ERROR, validation_result.get("message", "文件访问验证失败"), suggestions=["Check the file path and ensure the file exists"] ) # 优化的CSV文件读取 - 使用缓存的编码检测结果 df = _read_csv_with_smart_encoding(file_path) if df is None: return error_handler.create_error_response( ErrorType.FILE_READ_ERROR, "Failed to read CSV file with any supported encoding", suggestions=[ "Check the file encoding and format", "Try converting the file to UTF-8 format", "Ensure the file is a valid CSV format" ] ) return execute_code_safely(code, df, file_path) # 执行代码,并注入 file_path 变量 execution_result = _execute_code_safely(code, df, file_path=file_path) if not execution_result['success']: return error_handler.create_error_response( ErrorType.EXECUTION_ERROR, execution_result.get('error', 'Code execution failed'), suggestions=execution_result.get('suggestions', [ "Check data processing syntax", "Ensure operations are valid for the data" ]) ) return error_handler.create_success_response( execution_result.get('result'), message="Code executed successfully", metadata={ "output": execution_result.get('output', ''), "suggestion": "Use 'result' variable to store your final output." } ) except Exception as e: return error_handler.create_error_response( ErrorType.EXECUTION_ERROR, f"Unexpected error during code execution: {str(e)}", suggestions=[ "Check data processing syntax", "Ensure operations are valid for the data", "Contact support if the issue persists" ] ) @mcp.tool() def bar_chart_to_html( categories: list, values: list, title: str = "Interactive Chart", ) -> dict: """Generate interactive HTML bar chart using Chart.js template. Args: categories: List of category names for x-axis values: List of numeric values for y-axis title: Chart title (default: "Interactive Chart") x_label: Label for X-axis (default: "Categories") y_label: Label for Y-axis (default: "Values") Returns: dict: Contains file path and status information Example: >>> bar_chart_to_html( ... categories=['Electronics', 'Clothing', 'Home Goods', 'Sports Equipment'], ... values=[120000, 85000, 95000, 60000], ... title="Q1 Sales by Product Category" ... ) { "status": "SUCCESS", "filepath": "/absolute/path/to/plotXXXXXX.html", } """ # Validate input lengths if len(categories) != len(values): return error_handler.create_error_response( ErrorType.VALIDATION_ERROR, f"Categories ({len(categories)}) and values ({len(values)}) must be same length", suggestions=["Ensure categories and values lists have the same length"] ) # Read template file template_path = get_template_path("barchart_template.html") try: with open(template_path, 'r', encoding='utf-8') as f: template = f.read() except Exception as e: return error_handler.create_error_response( ErrorType.FILE_READ_ERROR, f"Failed to read chart template: {str(e)}", suggestions=["Check if template files exist", "Verify file permissions"] ) # Prepare data for Chart.js all_categories = categories all_values = values colors = [ "#4e73df", "#1cc88a", "#36b9cc", "#f6c23e", "#e74a3b", "#858796", "#f8f9fc", "#5a5c69", "#6610f2", "#6f42c1", "#e83e8c", "#d63384", "#fd7e14", "#ffc107", "#28a745", "#20c997", "#17a2b8", "#007bff", "#6c757d", "#343a40", "#dc3545", "#ff6b6b", "#4ecdc4", "#1a535c" ][:len(all_categories)] # Inject data into template template = template.replace( 'labels: ["Electronics", "Clothing", "Home Goods", "Sports Equipment"]', f'labels: {json.dumps(all_categories)}' ).replace( 'data: [120000, 85000, 95000, 60000]', f'data: {json.dumps(all_values)}' ).replace( 'backgroundColor: ["#4e73df", "#1cc88a", "#36b9cc", "#f6c23e"]', f'backgroundColor: {json.dumps(colors)}' ).replace( 'Sales by Category (2023)', title ).replace( 'legend: { position: \'top\' },', '' ) # Save to plot directory as HTML charts_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "charts") os.makedirs(charts_dir, exist_ok=True) timestamp = str(int(time.time())) filename = f"chart_{timestamp}.html" filepath = os.path.join(charts_dir, filename) try: with open(filepath, 'w', encoding='utf-8') as f: f.write(template) except Exception as e: return error_handler.create_error_response( ErrorType.FILE_WRITE_ERROR, f"Failed to write chart file: {str(e)}", suggestions=["Check directory permissions", "Ensure sufficient disk space"] ) return error_handler.create_success_response( {"filepath": os.path.abspath(filepath)}, message="Bar chart generated successfully" ) @mcp.tool() def pie_chart_to_html( labels: list, values: list, title: str = "Interactive Pie Chart" ) -> dict: """Generate interactive HTML pie chart using Chart.js template. Args: labels: List of label names for each pie slice values: List of numeric values for each slice title: Chart title (default: "Interactive Pie Chart") Returns: dict: Contains file path and status information Example: >>> pie_chart_to_html( ... labels=['Electronics', 'Clothing', 'Home Goods'], ... values=[120000, 85000, 95000], ... title="Q1 Sales Distribution" ... ) { "status": "SUCCESS", "filepath": "/absolute/path/to/plotXXXXXX.html", } """ # Validate input lengths if len(labels) != len(values): return error_handler.create_error_response( ErrorType.VALIDATION_ERROR, f"Labels ({len(labels)}) and values ({len(values)}) must be same length", suggestions=["Ensure labels and values lists have the same length"] ) # Read template file template_path = get_template_path("piechart_template.html") try: with open(template_path, 'r', encoding='utf-8') as f: template = f.read() except Exception as e: return error_handler.create_error_response( ErrorType.FILE_READ_ERROR, f"Failed to read chart template: {str(e)}", suggestions=["Check if template files exist", "Verify file permissions"] ) # Prepare data for Chart.js colors = [ "#4e73df", "#1cc88a", "#36b9cc", "#f6c23e", "#e74a3b", "#858796", "#f8f9fc", "#5a5c69", "#6610f2", "#6f42c1", "#e83e8c", "#d63384", "#fd7e14", "#ffc107", "#28a745", "#20c997", "#17a2b8", "#007bff", "#6c757d", "#343a40", "#dc3545", "#ff6b6b", "#4ecdc4", "#1a535c" ][:len(labels)] # Inject data into template template = template.replace( 'labels: ["Apple", "Samsung", "Huawei", "Xiaomi", "Others"]', f'labels: {json.dumps(labels)}' ).replace( 'data: [45, 25, 12, 8, 10]', f'data: {json.dumps(values)}' ).replace( 'backgroundColor: ["#4e73df", "#1cc88a", "#36b9cc", "#f6c23e", "#e74a3b"]', f'backgroundColor: {json.dumps(colors)}' ).replace( 'Global Smartphone Market Share (2023)', title ) # Save to plot directory as HTML charts_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "charts") os.makedirs(charts_dir, exist_ok=True) timestamp = str(int(time.time())) filename = f"chart_{timestamp}.html" filepath = os.path.join(charts_dir, filename) try: with open(filepath, 'w', encoding='utf-8') as f: f.write(template) except Exception as e: return error_handler.create_error_response( ErrorType.FILE_WRITE_ERROR, f"Failed to write chart file: {str(e)}", suggestions=["Check directory permissions", "Ensure sufficient disk space"] ) return error_handler.create_success_response( {"filepath": os.path.abspath(filepath)}, message="Pie chart generated successfully" ) @mcp.tool() def line_chart_to_html( labels: list, datasets: list, title: str = "Interactive Line Chart" ) -> dict: """Generate interactive HTML line chart using Chart.js template. Args: labels: List of label names for x-axis datasets: List of datasets, each containing: - label: Name of the dataset - data: List of numeric values (3 dimensions: [x, y, z]) title: Chart title (default: "Interactive Line Chart") Returns: dict: Contains file path and status information Example: >>> line_chart_to_html( ... labels=['Jan', 'Feb', 'Mar'], ... datasets=[ ... {'label': 'Sales', 'data': [[100, 200, 300], [150, 250, 350], [200, 300, 400]]}, ... {'label': 'Expenses', 'data': [[50, 100, 150], [75, 125, 175], [100, 150, 200]]} ... ], ... title="Monthly Performance" ... ) { "status": "SUCCESS", "filepath": "/absolute/path/to/plotXXXXXX.html", } """ # Validate input if not all(len(d['data']) == len(labels) for d in datasets): return error_handler.create_error_response( ErrorType.VALIDATION_ERROR, "All datasets must have same length as labels", suggestions=["Ensure all dataset data arrays match the labels length"] ) # Read template file template_path = get_template_path("linechart_template.html") try: with open(template_path, 'r', encoding='utf-8') as f: template = f.read() except Exception as e: return error_handler.create_error_response( ErrorType.FILE_READ_ERROR, f"Failed to read chart template: {str(e)}", suggestions=["Check if template files exist", "Verify file permissions"] ) # Prepare data for Chart.js chart_data = { "labels": labels, "datasets": [] } # Create datasets using main labels for dataset in datasets: chart_data['datasets'].append({ "label": dataset['label'], "data": dataset['data'], "borderColor": '#4e73df', # Default color "backgroundColor": '#4e73df', "borderWidth": 2, "pointRadius": 5, "tension": 0, "fill": False }) # Inject data into template template = template.replace( 'labels: ["Jan", "Feb", "Mar", "Apr", "May", "Jun", "Jul", "Aug", "Sep", "Oct", "Nov", "Dec"]', f'labels: {json.dumps(labels)}' ).replace( 'datasets: [\n' + ' {\n' + ' label: "Electronics",\n' + ' data: [6500, 5900, 8000, 8100, 8600, 8250, 9500, 10500, 12000, 11500, 13000, 15000],\n' + ' borderColor: "#4e73df",\n' + ' backgroundColor: "#4e73df",\n' + ' borderWidth: 2,\n' + ' pointRadius: 5,\n' + ' tension: 0,\n' + ' fill: false\n' + ' },\n' + ' {\n' + ' label: "Clothing",\n' + ' data: [12000, 11000, 12500, 10500, 11500, 13000, 14000, 12500, 11000, 9500, 10000, 12000],\n' + ' borderColor: "#1cc88a",\n' + ' backgroundColor: "#1cc88a",\n' + ' borderWidth: 2,\n' + ' pointRadius: 5,\n' + ' tension: 0,\n' + ' fill: false\n' + ' },\n' + ' {\n' + ' label: "Home Goods",\n' + ' data: [8000, 8500, 9000, 9500, 10000, 10500, 11000, 11500, 12000, 12500, 13000, 13500],\n' + ' borderColor: "#36b9cc",\n' + ' backgroundColor: "#36b9cc",\n' + ' borderWidth: 2,\n' + ' pointRadius: 5,\n' + ' tension: 0,\n' + ' fill: false\n' + ' }\n' + ' ]', f'datasets: {json.dumps(chart_data["datasets"], indent=16)}' ).replace( 'Interactive Sales Trend Dashboard', title ).replace( 'Monthly Sales Trend (2023)', title ) # Save to plot directory as HTML charts_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "charts") os.makedirs(charts_dir, exist_ok=True) timestamp = str(int(time.time())) filename = f"chart_{timestamp}.html" filepath = os.path.join(charts_dir, filename) try: with open(filepath, 'w', encoding='utf-8') as f: f.write(template) except Exception as e: return error_handler.create_error_response( ErrorType.FILE_WRITE_ERROR, f"Failed to write chart file: {str(e)}", suggestions=["Check directory permissions", "Ensure sufficient disk space"] ) return error_handler.create_success_response( {"filepath": os.path.abspath(filepath)}, message="Line chart generated successfully" ) @mcp.tool() def validate_data_quality(file_path: str) -> dict: """验证数据质量并提供改进建议 Args: file_path: 数据文件路径 Returns: dict: 数据质量报告和改进建议 """ try: validation_result = validate_file_access(file_path) if not validation_result.get("success", False): return validation_result df = pd.read_csv(file_path) quality_report = { "basic_info": { "total_rows": len(df), "columns": len(df.columns), "memory_usage": df.memory_usage(deep=True).sum() }, "missing_data": { "total_missing": df.isnull().sum().sum(), "missing_by_column": df.isnull().sum().to_dict(), "missing_percentage": (df.isnull().sum() / len(df) * 100).to_dict() }, "duplicates": { "duplicate_rows": df.duplicated().sum(), "duplicate_percentage": df.duplicated().sum() / len(df) * 100 }, "data_types": df.dtypes.astype(str).to_dict(), "recommendations": [] } # 生成建议 if quality_report["missing_data"]["total_missing"] > 0: quality_report["recommendations"].append("考虑处理缺失值:删除、填充或插值") if quality_report["duplicates"]["duplicate_rows"] > 0: quality_report["recommendations"].append("发现重复行,考虑去重处理") # 检查数据类型优化机会 for col, dtype in quality_report["data_types"].items(): if dtype == 'object': if df[col].nunique() / len(df) < 0.5: # 低基数字符串 quality_report["recommendations"].append(f"列 '{col}' 可考虑转换为分类类型以节省内存") return error_handler.create_success_response( quality_report, message="数据质量分析完成" ) except Exception as e: return error_handler.create_error_response( ErrorType.PROCESSING_ERROR, f"数据质量验证失败: {str(e)}", suggestions=["检查文件格式", "确认文件可读性"], details={"traceback": traceback.format_exc()} ) # 注册新的Excel智能工具 @mcp.tool() def suggest_excel_read_parameters_tool(file_path: str) -> dict: """智能推荐Excel文件读取参数 Args: file_path: Excel文件的绝对路径 Returns: dict: 包含推荐参数的结构化响应 """ return suggest_excel_read_parameters(file_path) @mcp.tool() def detect_excel_file_structure_tool(file_path: str) -> dict: """检测Excel文件结构 Args: file_path: Excel文件的绝对路径 Returns: dict: 包含文件结构信息的响应 """ return detect_excel_file_structure(file_path) @mcp.tool() def create_excel_read_template_tool(file_path: str, sheet_name: Optional[str] = None, skiprows: Optional[int] = None, header: Optional[int] = None, usecols: Optional[str] = None) -> dict: """生成Excel读取代码模板 Args: file_path: Excel文件的绝对路径 sheet_name: 工作表名称 skiprows: 跳过的行数 header: 标题行位置 usecols: 使用的列 Returns: dict: 包含代码模板的响应 """ return create_excel_read_template(file_path, sheet_name, skiprows, header, usecols) @mcp.tool() def comprehensive_data_verification_tool( file_path: str, reference_file: str = None, verification_level: str = "detailed", save_report: bool = True ) -> dict: """ 综合数据验证和核准工具 提供全面的Excel数据验证、质量评估和比对核准功能。 支持单文件验证和双文件比较验证模式。 Args: file_path: 要验证的Excel文件路径 reference_file: 参考文件路径(可选,用于比较验证) verification_level: 验证级别 - "basic": 基础验证(文件结构、基本统计) - "detailed": 详细验证(包含数据质量分析) - "comprehensive": 综合验证(包含异常检测和深度分析) save_report: 是否保存验证报告到本地 Returns: dict: 包含以下字段的验证结果 - overall_status: 总体状态 (EXCELLENT/GOOD/ACCEPTABLE/POOR/CRITICAL/FAILED) - data_quality_score: 数据质量得分 (0-100) - file_analysis: 文件结构分析结果 - data_integrity: 数据完整性验证结果 - comparison_results: 比较验证结果(如果提供了参考文件) - recommendations: 改进建议列表 - detailed_report: 详细报告(详细和综合级别) 功能特点: 1. 多层次验证:支持基础、详细、综合三个验证级别 2. 智能编码检测:自动检测文件编码并优化读取 3. 数据质量评估:计算综合质量得分 4. 异常检测:识别异常值和数据模式 5. 比较验证:支持与参考文件的详细比较 6. 报告生成:自动生成验证报告并可保存 7. 建议系统:提供针对性的数据改进建议 使用示例: - 基础验证: comprehensive_data_verification_tool("data.xlsx", verification_level="basic") - 详细验证: comprehensive_data_verification_tool("data.xlsx", verification_level="detailed") - 比较验证: comprehensive_data_verification_tool("data.xlsx", "reference.xlsx", "comprehensive") """ try: # 验证文件路径 if not os.path.exists(file_path): return { "success": False, "error": f"文件不存在: {file_path}", "overall_status": "FAILED" } if reference_file and not os.path.exists(reference_file): return { "success": False, "error": f"参考文件不存在: {reference_file}", "overall_status": "FAILED" } # 创建综合验证器 verifier = ComprehensiveDataVerifier() # 执行综合验证 verification_result = verifier.comprehensive_excel_verification( file_path=file_path, reference_file=reference_file, verification_level=verification_level, save_report=save_report ) # 添加成功标志 verification_result["success"] = True # 添加验证摘要 verification_result["verification_summary"] = { "file_name": os.path.basename(file_path), "verification_time": verification_result.get("timestamp"), "quality_score": verification_result.get("data_quality_score", 0), "status": verification_result.get("overall_status", "UNKNOWN"), "has_reference": reference_file is not None, "level": verification_level, "recommendations_count": len(verification_result.get("recommendations", [])) } return verification_result except Exception as e: return { "success": False, "error": f"综合验证过程中发生错误: {str(e)}", "overall_status": "ERROR", "file_path": file_path, "reference_file": reference_file, "verification_level": verification_level } @mcp.tool() def batch_data_verification_tool( file_paths: list, verification_level: str = "detailed", save_reports: bool = True ) -> dict: """ 批量数据验证工具 对多个Excel文件进行批量验证和质量评估。 Args: file_paths: Excel文件路径列表 verification_level: 验证级别 ("basic", "detailed", "comprehensive") save_reports: 是否保存验证报告 Returns: dict: 批量验证结果 - overall_summary: 总体摘要 - individual_results: 各文件验证结果 - quality_ranking: 质量排名 - batch_recommendations: 批量建议 """ try: if not file_paths or not isinstance(file_paths, list): return { "success": False, "error": "请提供有效的文件路径列表" } verifier = ComprehensiveDataVerifier() batch_results = { "success": True, "total_files": len(file_paths), "processed_files": 0, "failed_files": 0, "overall_summary": {}, "individual_results": {}, "quality_ranking": [], "batch_recommendations": [] } quality_scores = [] # 逐个验证文件 for file_path in file_paths: try: if os.path.exists(file_path): result = verifier.comprehensive_excel_verification( file_path=file_path, verification_level=verification_level, save_report=save_reports ) batch_results["individual_results"][file_path] = result quality_scores.append({ "file": os.path.basename(file_path), "path": file_path, "score": result.get("data_quality_score", 0), "status": result.get("overall_status", "UNKNOWN") }) batch_results["processed_files"] += 1 else: batch_results["individual_results"][file_path] = { "success": False, "error": "文件不存在", "overall_status": "FAILED" } batch_results["failed_files"] += 1 except Exception as e: batch_results["individual_results"][file_path] = { "success": False, "error": str(e), "overall_status": "ERROR" } batch_results["failed_files"] += 1 # 生成质量排名 batch_results["quality_ranking"] = sorted( quality_scores, key=lambda x: x["score"], reverse=True ) # 生成总体摘要 if quality_scores: scores = [item["score"] for item in quality_scores] batch_results["overall_summary"] = { "average_quality_score": sum(scores) / len(scores), "highest_score": max(scores), "lowest_score": min(scores), "excellent_files": len([s for s in scores if s >= 90]), "good_files": len([s for s in scores if 80 <= s < 90]), "acceptable_files": len([s for s in scores if 70 <= s < 80]), "poor_files": len([s for s in scores if 60 <= s < 70]), "critical_files": len([s for s in scores if s < 60]) } # 生成批量建议 if batch_results["failed_files"] > 0: batch_results["batch_recommendations"].append( f"有{batch_results['failed_files']}个文件验证失败,请检查文件格式和路径" ) if quality_scores: avg_score = sum([item["score"] for item in quality_scores]) / len(quality_scores) if avg_score < 70: batch_results["batch_recommendations"].append( "整体数据质量偏低,建议进行数据清洗和质量改进" ) elif avg_score >= 90: batch_results["batch_recommendations"].append( "整体数据质量优秀,可以放心使用" ) return batch_results except Exception as e: return { "success": False, "error": f"批量验证过程中发生错误: {str(e)}" } @mcp.tool() def excel_read_enhanced( file_path: str, sheet_name: str = None, start_row: int = None, end_row: int = None, start_col: str = None, end_col: str = None, use_go_service: bool = True ) -> dict: """ 增强版 Excel 读取工具,集成 Go excelize 库提供高性能处理 Args: file_path: Excel 文件路径 sheet_name: 工作表名称(可选) start_row: 起始行号(可选) end_row: 结束行号(可选) start_col: 起始列(如 'A',可选) end_col: 结束列(如 'Z',可选) use_go_service: 是否优先使用 Go 服务(默认 True) Returns: dict: 读取结果,包含数据和性能信息 """ try: processor = get_excel_processor() result = processor.read_excel_enhanced( file_path=file_path, sheet_name=sheet_name, start_row=start_row, end_row=end_row, start_col=start_col, end_col=end_col, use_go=use_go_service ) return result except Exception as e: return { "success": False, "error": f"增强 Excel 读取失败: {str(e)}", "suggestion": "请检查文件路径和参数设置" } @mcp.tool() def excel_write_enhanced( file_path: str, data: list, sheet_name: str = None, start_row: int = None, start_col: str = None, use_go_service: bool = True ) -> dict: """ 增强版 Excel 写入工具,集成 Go excelize 库提供高性能处理 Args: file_path: Excel 文件路径 data: 要写入的数据(字典列表格式) sheet_name: 工作表名称(可选) start_row: 起始行号(可选) start_col: 起始列(如 'A',可选) use_go_service: 是否优先使用 Go 服务(默认 True) Returns: dict: 写入结果,包含性能信息 """ try: if not isinstance(data, list): return { "success": False, "error": "数据格式错误,需要字典列表格式", "suggestion": "请提供 [{column1: value1, column2: value2}, ...] 格式的数据" } processor = get_excel_processor() result = processor.write_excel_enhanced( file_path=file_path, data=data, sheet_name=sheet_name, start_row=start_row, start_col=start_col, use_go=use_go_service ) return result except Exception as e: return { "success": False, "error": f"增强 Excel 写入失败: {str(e)}", "suggestion": "请检查文件路径和数据格式" } @mcp.tool() def excel_chart_enhanced( file_path: str, chart_type: str, data_range: str, sheet_name: str = None, title: str = None, x_axis_title: str = None, y_axis_title: str = None ) -> dict: """ 增强版 Excel 图表创建工具,使用 Go excelize 库提供高性能图表生成 Args: file_path: Excel 文件路径 chart_type: 图表类型('col', 'line', 'pie', 'bar', 'area', 'scatter' 等) data_range: 数据范围(如 'A1:B10') sheet_name: 工作表名称(可选) title: 图表标题(可选) x_axis_title: X轴标题(可选) y_axis_title: Y轴标题(可选) Returns: dict: 图表创建结果 """ try: # 验证图表类型 valid_chart_types = ['col', 'line', 'pie', 'bar', 'area', 'scatter', 'doughnut'] if chart_type not in valid_chart_types: return { "success": False, "error": f"不支持的图表类型: {chart_type}", "suggestion": f"支持的图表类型: {', '.join(valid_chart_types)}" } processor = get_excel_processor() result = processor.create_chart_enhanced( file_path=file_path, chart_type=chart_type, data_range=data_range, sheet_name=sheet_name, chart_title=title, x_axis_title=x_axis_title, y_axis_title=y_axis_title ) return result except Exception as e: return { "success": False, "error": f"增强图表创建失败: {str(e)}", "suggestion": "请检查文件路径、数据范围和图表参数" } @mcp.tool() def excel_info_enhanced(file_path: str) -> dict: """ 增强版 Excel 文件信息获取工具,使用 Go excelize 库提供详细文件分析 Args: file_path: Excel 文件路径 Returns: dict: 详细的文件信息,包括工作表、行列数等 """ try: processor = get_excel_processor() result = processor.get_file_info_enhanced(file_path) return result except Exception as e: return { "success": False, "error": f"获取文件信息失败: {str(e)}", "suggestion": "请检查文件路径是否正确" } @mcp.tool() def excel_performance_comparison( file_path: str, operation: str = "read", test_data: list = None ) -> dict: """ Excel 性能对比工具,比较 Go 服务和 pandas 的性能差异 Args: file_path: Excel 文件路径 operation: 操作类型('read' 或 'write') test_data: 测试数据(写入操作时需要) Returns: dict: 性能对比结果 """ try: import time results = { "success": True, "operation": operation, "file_path": file_path, "performance_comparison": {}, "recommendation": "" } processor = get_excel_processor() if operation == "read": # 测试 Go 服务性能 start_time = time.time() go_result = processor.read_excel_enhanced(file_path, use_go=True) go_time = time.time() - start_time # 测试 pandas 性能 start_time = time.time() pandas_result = processor.read_excel_enhanced(file_path, use_go=False) pandas_time = time.time() - start_time results["performance_comparison"] = { "go_service": { "time_seconds": round(go_time, 4), "success": go_result.get("success", False), "method": go_result.get("data", {}).get("method", "unknown") }, "pandas": { "time_seconds": round(pandas_time, 4), "success": pandas_result.get("success", False), "method": pandas_result.get("data", {}).get("method", "unknown") } } # 性能提升计算 if go_time > 0 and pandas_time > 0: speedup = pandas_time / go_time results["performance_comparison"]["speedup"] = round(speedup, 2) if speedup > 2: results["recommendation"] = f"Go 服务比 pandas 快 {speedup:.1f} 倍,建议使用 Go 服务" elif speedup < 0.8: results["recommendation"] = "pandas 性能更好,建议使用 pandas" else: results["recommendation"] = "两种方法性能相近,可根据需要选择" elif operation == "write": if not test_data: return { "success": False, "error": "写入测试需要提供 test_data 参数", "suggestion": "请提供测试数据列表" } # 创建临时文件进行测试 import tempfile with tempfile.NamedTemporaryFile(suffix='.xlsx', delete=False) as tmp1, \ tempfile.NamedTemporaryFile(suffix='.xlsx', delete=False) as tmp2: # 测试 Go 服务性能 start_time = time.time() go_result = processor.write_excel_enhanced(tmp1.name, test_data, use_go=True) go_time = time.time() - start_time # 测试 pandas 性能 start_time = time.time() pandas_result = processor.write_excel_enhanced(tmp2.name, test_data, use_go=False) pandas_time = time.time() - start_time # 清理临时文件 try: os.unlink(tmp1.name) os.unlink(tmp2.name) except: pass results["performance_comparison"] = { "go_service": { "time_seconds": round(go_time, 4), "success": go_result.get("success", False), "method": go_result.get("data", {}).get("method", "unknown") }, "pandas": { "time_seconds": round(pandas_time, 4), "success": pandas_result.get("success", False), "method": pandas_result.get("data", {}).get("method", "unknown") } } # 性能提升计算 if go_time > 0 and pandas_time > 0: speedup = pandas_time / go_time results["performance_comparison"]["speedup"] = round(speedup, 2) if speedup > 2: results["recommendation"] = f"Go 服务比 pandas 快 {speedup:.1f} 倍,建议使用 Go 服务" elif speedup < 0.8: results["recommendation"] = "pandas 性能更好,建议使用 pandas" else: results["recommendation"] = "两种方法性能相近,可根据需要选择" else: return { "success": False, "error": f"不支持的操作类型: {operation}", "suggestion": "支持的操作类型: 'read', 'write'" } return results except Exception as e: return { "success": False, "error": f"性能对比测试失败: {str(e)}", "suggestion": "请检查文件路径和参数设置" } # 注册 Excel 公式处理工具 @mcp.tool() def parse_formula(formula: str, validate_security: bool = False) -> str: """解析 Excel 公式 Args: formula: Excel 公式字符串 validate_security: 是否进行安全验证 Returns: str: JSON 格式的解析结果 """ return parse_excel_formula(formula, validate_security) @mcp.tool() def compile_workbook(file_path: str, output_format: str = 'python') -> str: """编译 Excel 工作簿为代码 Args: file_path: Excel 文件路径 output_format: 输出格式 ('python' 或 'json') Returns: str: JSON 格式的编译结果 """ return compile_excel_workbook(file_path, output_format) @mcp.tool() def execute_formula(formula: str, context: str = '{}') -> str: """执行 Excel 公式 Args: formula: Excel 公式字符串 context: JSON 格式的上下文数据 Returns: str: JSON 格式的执行结果 """ return execute_excel_formula(formula, context) @mcp.tool() def analyze_dependencies(file_path: str) -> str: """分析 Excel 文件的公式依赖关系 Args: file_path: Excel 文件路径 Returns: str: JSON 格式的依赖分析结果 """ return analyze_excel_dependencies(file_path) @mcp.tool() def validate_formula(formula: str) -> str: """验证 Excel 公式的安全性和有效性 Args: formula: Excel 公式字符串 Returns: str: JSON 格式的验证结果 """ return validate_excel_formula(formula) # 增强的Excel数据质量控制工具 @mcp.tool() def enhanced_data_quality_check(file_path: str, quality_level: str = "comprehensive") -> dict: """增强的Excel数据质量检查工具 Args: file_path: Excel文件路径 quality_level: 质量检查级别 ("basic", "standard", "comprehensive") Returns: dict: 数据质量检查结果 """ try: return data_quality_controller.comprehensive_quality_check(file_path, quality_level) except Exception as e: return error_handler.create_error_response( ErrorType.PROCESSING_ERROR, f"数据质量检查失败: {str(e)}", suggestions=["检查文件格式", "确认文件可读性"] ) @mcp.tool() def extract_cell_content_advanced(file_path: str, cell_range: Optional[str] = None, sheet_name: Optional[str] = None, extract_type: str = "all") -> dict: """高级单元格内容提取工具 Args: file_path: Excel文件路径 cell_range: 单元格范围 (如 "A1:C10") sheet_name: 工作表名称 extract_type: 提取类型 ("all", "text", "numbers", "formulas", "formatted") Returns: dict: 提取的单元格内容 """ try: return cell_content_extractor.extract_cell_content_advanced( file_path, cell_range, sheet_name, extract_type ) except Exception as e: return error_handler.create_error_response( ErrorType.PROCESSING_ERROR, f"单元格内容提取失败: {str(e)}", suggestions=["检查单元格范围格式", "确认工作表名称"] ) @mcp.tool() def convert_character_formats(file_path: str, conversion_rules: dict, output_path: Optional[str] = None) -> dict: """字符格式自动化转换工具 Args: file_path: Excel文件路径 conversion_rules: 转换规则字典 output_path: 输出文件路径 Returns: dict: 转换结果 """ try: return character_converter.batch_character_conversion( file_path, conversion_rules, output_path ) except Exception as e: return error_handler.create_error_response( ErrorType.PROCESSING_ERROR, f"字符格式转换失败: {str(e)}", suggestions=["检查转换规则格式", "确认输出路径权限"] ) @mcp.tool() def extract_multi_condition_data(file_path: str, conditions: list, sheet_name: Optional[str] = None) -> dict: """多条件数据提取工具 Args: file_path: Excel文件路径 conditions: 条件列表 sheet_name: 工作表名称 Returns: dict: 提取的数据 """ try: return multi_condition_extractor.extract_with_multiple_conditions( file_path, conditions, sheet_name ) except Exception as e: return error_handler.create_error_response( ErrorType.PROCESSING_ERROR, f"多条件数据提取失败: {str(e)}", suggestions=["检查条件格式", "确认工作表存在"] ) @mcp.tool() def merge_multiple_tables(file_paths: list, merge_config: dict, output_path: Optional[str] = None) -> dict: """多表格数据合并工具 Args: file_paths: Excel文件路径列表 merge_config: 合并配置 output_path: 输出文件路径 Returns: dict: 合并结果 """ try: return multi_table_merger.merge_multiple_excel_files( file_paths, merge_config, output_path ) except Exception as e: return error_handler.create_error_response( ErrorType.PROCESSING_ERROR, f"表格合并失败: {str(e)}", suggestions=["检查文件路径", "确认合并配置"] ) @mcp.tool() def clean_excel_data(file_path: str, cleaning_options: dict, output_path: Optional[str] = None) -> dict: """Excel数据清洗工具 Args: file_path: Excel文件路径 cleaning_options: 清洗选项 output_path: 输出文件路径 Returns: dict: 清洗结果 """ try: return data_cleaner.clean_excel_data( file_path=file_path, cleaning_config=cleaning_options, output_file=output_path ) except Exception as e: return error_handler.create_error_response( ErrorType.PROCESSING_ERROR, f"数据清洗失败: {str(e)}", suggestions=["检查清洗选项", "确认文件格式"] ) @mcp.tool() def batch_process_excel_files(file_paths: list, processing_config: dict) -> dict: """批量Excel文件处理工具 Args: file_paths: Excel文件路径列表 processing_config: 处理配置 Returns: dict: 批量处理结果 """ try: return batch_processor.batch_process_files( file_paths, processing_config ) except Exception as e: return error_handler.create_error_response( ErrorType.PROCESSING_ERROR, f"批量处理失败: {str(e)}", suggestions=["检查文件路径列表", "确认处理配置"] ) if __name__ == "__main__": import argparse import sys # 添加命令行参数处理 parser = argparse.ArgumentParser( description='ChatExcel MCP Server - Excel数据处理服务器', formatter_class=argparse.RawDescriptionHelpFormatter ) parser.add_argument( '--version', action='version', version='ChatExcel MCP Server v2.1.0' ) parser.add_argument( '--tools', action='store_true', help='显示可用工具列表' ) parser.add_argument( '--config', type=str, help='指定配置文件路径' ) parser.add_argument( '--health-check', action='store_true', help='执行健康检查' ) parser.add_argument( '--debug', action='store_true', help='启用调试模式' ) # 解析命令行参数 args = parser.parse_args() # 初始化配置 if CORE_MODULES_AVAILABLE: try: if args.config: from core.config import load_config_from_file config = load_config_from_file(args.config) else: config = get_config() # 设置调试模式 if args.debug: config.logging.level = "DEBUG" print("🔧 调试模式已启用") except Exception as e: print(f"⚠ 配置加载失败: {e}") config = None else: config = None # 健康检查 if args.health_check: print("🏥 执行健康检查...") health_status = { "core_modules": CORE_MODULES_AVAILABLE, "dependency_manager": len(dependency_manager.available_modules), "failed_imports": len(dependency_manager.failed_imports), "config_loaded": config is not None } print(f"✓ 核心模块: {'可用' if health_status['core_modules'] else '不可用'}") print(f"✓ 可用依赖模块: {health_status['dependency_manager']}") print(f"⚠ 失败导入: {health_status['failed_imports']}") print(f"✓ 配置: {'已加载' if health_status['config_loaded'] else '未加载'}") if dependency_manager.failed_imports: print("\n失败的模块导入:") for module in dependency_manager.failed_imports: print(f" - {module}") # 检查关键目录 try: os.makedirs(CHARTS_DIR, exist_ok=True) print(f"✓ 图表目录: {CHARTS_DIR}") except Exception as e: print(f"⚠ 图表目录创建失败: {e}") sys.exit(0) # 如果请求工具列表 if args.tools: print("ChatExcel MCP Server 可用工具:") print("1. read_excel_file - 读取Excel文件") print("2. run_excel_code - 执行Excel数据处理代码") print("3. create_chart - 创建图表") print("4. export_data - 导出数据") print("5. data_cleaner - 数据清洗") print("6. batch_process_excel_files - 批量处理Excel文件") print("7. read_metadata - 读取文件元数据") print("8. convert_character_formats - 字符格式转换") print("9. extract_multi_condition_data - 多条件数据提取") print("10. merge_multiple_tables - 多表格合并") print("...以及其他21个专业Excel处理工具") print(f"\n总计: 31个工具") print(f"支持格式: xlsx, xls, csv, json, html") if CORE_MODULES_AVAILABLE and config: print(f"\n配置信息:") print(f" - 最大文件大小: {config.security.max_file_size} 字节") print(f" - 执行超时: {config.performance.execution_timeout} 秒") print(f" - 缓存启用: {config.performance.enable_cache}") sys.exit(0) # 确保必要目录存在 try: os.makedirs(CHARTS_DIR, exist_ok=True) print(f"✓ 图表目录已准备: {CHARTS_DIR}") except Exception as e: print(f"⚠ 图表目录创建失败: {e}") # 启动信息 print("🚀 启动 ChatExcel MCP Server...") print(f"📊 工具数量: 31") print(f"📁 支持格式: xlsx, xls, csv, json, html") print(f"🔧 核心模块: {'已加载' if CORE_MODULES_AVAILABLE else '备用模式'}") print(f"⚙️ 依赖模块: {len(dependency_manager.available_modules)} 可用") if dependency_manager.failed_imports: print(f"⚠️ 失败导入: {len(dependency_manager.failed_imports)} 个模块") if CORE_MODULES_AVAILABLE and config: print(f"🛡️ 安全配置: 已启用") print(f"⚡ 性能优化: 已启用") print("\n服务器启动中...") try: mcp.run() except KeyboardInterrupt: print("\n👋 服务器已停止") except Exception as e: print(f"\n❌ 服务器启动失败: {e}") if args.debug: import traceback traceback.print_exc() sys.exit(1) # 备用实现类 class FallbackVariableLifecycleManager: def __init__(self): self.variable_registry = {} self.creation_order = [] def register_variable(self, name, value, source='user'): self.variable_registry[name] = { 'value': value, 'source': source, 'type': type(value).__name__ } if name not in self.creation_order: self.creation_order.append(name) def get_dataframes(self): return [k for k, v in self.variable_registry.items() if hasattr(v.get('value'), 'shape')] class FallbackNameErrorHandler: def __init__(self): pass def handle_name_error(self, error, code, local_vars): return {'success': False, 'error': str(error)} class FallbackColumnChecker: def __init__(self): pass def match_column(self, missing_column, available_columns): return { 'exact_match': None, 'case_insensitive_match': None, 'normalized_matches': [] } def generate_code_suggestions(self, missing_column, match_result): return [] def fallback_enhanced_execute_with_error_handling(code, local_vars, var_manager=None, error_handler=None): try: exec(code, local_vars) return {'success': True, 'variables': local_vars} except Exception as e: return {'success': False, 'error': str(e)} def fallback_analyze_code_variables(code): # 简单的变量分析 import re # 查找变量赋值 assignments = re.findall(r'(\w+)\s*=', code) # 查找变量引用 references = re.findall(r'(\w+)', code) return { 'defined_variables': list(set(assignments)), 'referenced_variables': list(set(references)), 'variables': list(set(assignments + references)), 'assignments': assignments } def fallback_enhanced_run_excel_code(file_path, code, **kwargs): """增强的Excel代码执行备用函数,集成pandas导入修复""" import pandas as pd import numpy as np import traceback try: # 提取参数 sheet_name = kwargs.get('sheet_name') skiprows = kwargs.get('skiprows') header = kwargs.get('header') usecols = kwargs.get('usecols') # 构建读取参数 read_kwargs = {} if sheet_name is not None: read_kwargs['sheet_name'] = sheet_name if skiprows is not None: read_kwargs['skiprows'] = skiprows if header is not None: read_kwargs['header'] = header if usecols is not None: read_kwargs['usecols'] = usecols # 读取Excel文件 df = pd.read_excel(file_path, **read_kwargs) # 如果返回字典(多工作表),取第一个 if isinstance(df, dict): first_sheet = list(df.keys())[0] df = df[first_sheet] # 创建增强的安全执行环境 safe_globals = { # pandas 相关 - 多种引用方式确保兼容性 'pd': pd, 'pandas': pd, 'DataFrame': pd.DataFrame, 'Series': pd.Series, # numpy 相关 'np': np, 'numpy': np, # 数据相关 'df': df.copy() if hasattr(df, 'copy') else df, 'data': df.copy() if hasattr(df, 'copy') else df, 'df_original': df.copy() if hasattr(df, 'copy') else df, # 文件路径 'file_path': file_path, # 常用内置函数 'len': len, 'sum': sum, 'max': max, 'min': min, 'abs': abs, 'round': round, 'sorted': sorted, 'enumerate': enumerate, 'zip': zip, 'range': range, 'list': list, 'dict': dict, 'set': set, 'tuple': tuple, 'str': str, 'int': int, 'float': float, 'bool': bool, 'print': print, 'type': type, 'isinstance': isinstance, 'hasattr': hasattr, 'getattr': getattr, # 添加 __import__ 支持 '__import__': __import__, } # 检查是否使用了df_processed但未定义 if 'df_processed' in code and 'df_processed =' not in code: code = f"df_processed = df.copy()\n{code}" # 执行代码 exec(code, safe_globals) return { 'success': True, 'variables': {k: v for k, v in safe_globals.items() if not k.startswith('_')}, 'data_shape': df.shape if hasattr(df, 'shape') else None, 'message': '代码执行成功(增强备用实现)' } except Exception as e: return { 'success': False, 'error': f"代码执行失败: {str(e)}", 'error_type': type(e).__name__, 'traceback': traceback.format_exc(), 'errors': [{ 'message': f"代码执行失败: {str(e)}", 'code': 'ErrorCode.CODE_EXECUTION_ERROR' }], 'warnings': [] }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Lillard01/chatExcel-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server