Excel MCP Server

by fish0710
Verified
import logging import sys import os from typing import Any, List, Dict from mcp.server.fastmcp import FastMCP # Import exceptions from excel_mcp.exceptions import ( ValidationError, WorkbookError, SheetError, DataError, FormattingError, CalculationError, PivotError, ChartError ) # Import from excel_mcp package with consistent _impl suffixes from excel_mcp.validation import ( validate_formula_in_cell_operation as validate_formula_impl, validate_range_in_sheet_operation as validate_range_impl ) from excel_mcp.chart import create_chart_in_sheet as create_chart_impl from excel_mcp.workbook import get_workbook_info from excel_mcp.data import write_data from excel_mcp.pivot import create_pivot_table as create_pivot_table_impl from excel_mcp.sheet import ( copy_sheet, delete_sheet, rename_sheet, merge_range, unmerge_range, ) # Configure logging logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", handlers=[ logging.StreamHandler(sys.stdout), logging.FileHandler("excel-mcp.log") ], force=True ) logger = logging.getLogger("excel-mcp") # Get Excel files path from environment or use default EXCEL_FILES_PATH = os.environ.get("EXCEL_FILES_PATH", "./excel_files") # Create the directory if it doesn't exist os.makedirs(EXCEL_FILES_PATH, exist_ok=True) # Initialize FastMCP server mcp = FastMCP( "excel-mcp", version="0.1.0", description="Excel MCP Server for manipulating Excel files", dependencies=["openpyxl>=3.1.2"], env_vars={ "EXCEL_FILES_PATH": { "description": "Path to Excel files directory", "required": False, "default": EXCEL_FILES_PATH } } ) def get_excel_path(filename: str) -> str: """Get full path to Excel file. Args: filename: Name of Excel file Returns: Full path to Excel file """ # If filename is already an absolute path, return it if os.path.isabs(filename): return filename # Use the configured Excel files path return os.path.join(EXCEL_FILES_PATH, filename) @mcp.tool() def apply_formula( filepath: str, sheet_name: str, cell: str, formula: str, ) -> str: """Apply Excel formula to cell.""" try: full_path = get_excel_path(filepath) # First validate the formula validation = validate_formula_impl(full_path, sheet_name, cell, formula) if isinstance(validation, dict) and "error" in validation: return f"Error: {validation['error']}" # If valid, apply the formula from excel_mcp.calculations import apply_formula as apply_formula_impl result = apply_formula_impl(full_path, sheet_name, cell, formula) return result["message"] except (ValidationError, CalculationError) as e: return f"Error: {str(e)}" except Exception as e: logger.error(f"Error applying formula: {e}") raise @mcp.tool() def validate_formula_syntax( filepath: str, sheet_name: str, cell: str, formula: str, ) -> str: """Validate Excel formula syntax without applying it.""" try: full_path = get_excel_path(filepath) result = validate_formula_impl(full_path, sheet_name, cell, formula) return result["message"] except (ValidationError, CalculationError) as e: return f"Error: {str(e)}" except Exception as e: logger.error(f"Error validating formula: {e}") raise @mcp.tool() def format_range( filepath: str, sheet_name: str, start_cell: str, end_cell: str = None, bold: bool = False, italic: bool = False, underline: bool = False, font_size: int = None, font_color: str = None, bg_color: str = None, border_style: str = None, border_color: str = None, number_format: str = None, alignment: str = None, wrap_text: bool = False, merge_cells: bool = False, protection: Dict[str, Any] = None, conditional_format: Dict[str, Any] = None ) -> str: """Apply formatting to a range of cells.""" try: full_path = get_excel_path(filepath) from excel_mcp.formatting import format_range as format_range_func result = format_range_func( filepath=full_path, sheet_name=sheet_name, start_cell=start_cell, end_cell=end_cell, bold=bold, italic=italic, underline=underline, font_size=font_size, font_color=font_color, bg_color=bg_color, border_style=border_style, border_color=border_color, number_format=number_format, alignment=alignment, wrap_text=wrap_text, merge_cells=merge_cells, protection=protection, conditional_format=conditional_format ) return "Range formatted successfully" except (ValidationError, FormattingError) as e: return f"Error: {str(e)}" except Exception as e: logger.error(f"Error formatting range: {e}") raise @mcp.tool() def read_data_from_excel( filepath: str, sheet_name: str, start_cell: str = "A1", end_cell: str = None, preview_only: bool = False ) -> str: """Read data from Excel worksheet.""" try: full_path = get_excel_path(filepath) from excel_mcp.data import read_excel_range result = read_excel_range(full_path, sheet_name, start_cell, end_cell, preview_only) if not result: return "No data found in specified range" # Convert the list of dicts to a formatted string data_str = "\n".join([str(row) for row in result]) return data_str except Exception as e: logger.error(f"Error reading data: {e}") raise @mcp.tool() def write_data_to_excel( filepath: str, sheet_name: str, data: List[Dict], start_cell: str = "A1", write_headers: bool = True, ) -> str: """Write data to Excel worksheet.""" try: full_path = get_excel_path(filepath) result = write_data(full_path, sheet_name, data, start_cell, write_headers) return result["message"] except (ValidationError, DataError) as e: return f"Error: {str(e)}" except Exception as e: logger.error(f"Error writing data: {e}") raise @mcp.tool() def create_workbook(filepath: str, upload: bool = True) -> str: """Create new Excel workbook and optionally upload it to file server.""" try: full_path = get_excel_path(filepath) from excel_mcp.workbook import create_workbook as create_workbook_impl result = create_workbook_impl(full_path, upload=upload) if upload and "file_url" in result: return f"Created workbook at {full_path} and uploaded to {result['file_url']}" else: return f"Created workbook at {full_path}" except WorkbookError as e: return f"Error: {str(e)}" except Exception as e: logger.error(f"Error creating workbook: {e}") raise @mcp.tool() def create_worksheet(filepath: str, sheet_name: str) -> str: """Create new worksheet in workbook.""" try: full_path = get_excel_path(filepath) from excel_mcp.workbook import create_sheet as create_worksheet_impl result = create_worksheet_impl(full_path, sheet_name) return result["message"] except (ValidationError, WorkbookError) as e: return f"Error: {str(e)}" except Exception as e: logger.error(f"Error creating worksheet: {e}") raise @mcp.tool() def create_chart( filepath: str, sheet_name: str, data_range: str, chart_type: str, target_cell: str, title: str = "", x_axis: str = "", y_axis: str = "" ) -> str: """Create chart in worksheet.""" try: full_path = get_excel_path(filepath) result = create_chart_impl( filepath=full_path, sheet_name=sheet_name, data_range=data_range, chart_type=chart_type, target_cell=target_cell, title=title, x_axis=x_axis, y_axis=y_axis ) return result["message"] except (ValidationError, ChartError) as e: return f"Error: {str(e)}" except Exception as e: logger.error(f"Error creating chart: {e}") raise @mcp.tool() def create_pivot_table( filepath: str, sheet_name: str, data_range: str, rows: List[str], values: List[str], columns: List[str] = None, agg_func: str = "mean" ) -> str: """Create pivot table in worksheet.""" try: full_path = get_excel_path(filepath) result = create_pivot_table_impl( filepath=full_path, sheet_name=sheet_name, data_range=data_range, rows=rows, values=values, columns=columns or [], agg_func=agg_func ) return result["message"] except (ValidationError, PivotError) as e: return f"Error: {str(e)}" except Exception as e: logger.error(f"Error creating pivot table: {e}") raise @mcp.tool() def copy_worksheet( filepath: str, source_sheet: str, target_sheet: str ) -> str: """Copy worksheet within workbook.""" try: full_path = get_excel_path(filepath) result = copy_sheet(full_path, source_sheet, target_sheet) return result["message"] except (ValidationError, SheetError) as e: return f"Error: {str(e)}" except Exception as e: logger.error(f"Error copying worksheet: {e}") raise @mcp.tool() def delete_worksheet( filepath: str, sheet_name: str ) -> str: """Delete worksheet from workbook.""" try: full_path = get_excel_path(filepath) result = delete_sheet(full_path, sheet_name) return result["message"] except (ValidationError, SheetError) as e: return f"Error: {str(e)}" except Exception as e: logger.error(f"Error deleting worksheet: {e}") raise @mcp.tool() def rename_worksheet( filepath: str, old_name: str, new_name: str ) -> str: """Rename worksheet in workbook.""" try: full_path = get_excel_path(filepath) result = rename_sheet(full_path, old_name, new_name) return result["message"] except (ValidationError, SheetError) as e: return f"Error: {str(e)}" except Exception as e: logger.error(f"Error renaming worksheet: {e}") raise @mcp.tool() def get_workbook_metadata( filepath: str, include_ranges: bool = False ) -> str: """Get metadata about workbook including sheets, ranges, etc.""" try: full_path = get_excel_path(filepath) result = get_workbook_info(full_path, include_ranges=include_ranges) return str(result) except WorkbookError as e: return f"Error: {str(e)}" except Exception as e: logger.error(f"Error getting workbook metadata: {e}") raise @mcp.tool() def merge_cells(filepath: str, sheet_name: str, start_cell: str, end_cell: str) -> str: """Merge a range of cells.""" try: full_path = get_excel_path(filepath) result = merge_range(full_path, sheet_name, start_cell, end_cell) return result["message"] except (ValidationError, SheetError) as e: return f"Error: {str(e)}" except Exception as e: logger.error(f"Error merging cells: {e}") raise @mcp.tool() def unmerge_cells(filepath: str, sheet_name: str, start_cell: str, end_cell: str) -> str: """Unmerge a range of cells.""" try: full_path = get_excel_path(filepath) result = unmerge_range(full_path, sheet_name, start_cell, end_cell) return result["message"] except (ValidationError, SheetError) as e: return f"Error: {str(e)}" except Exception as e: logger.error(f"Error unmerging cells: {e}") raise @mcp.tool() def copy_range( filepath: str, sheet_name: str, source_start: str, source_end: str, target_start: str, target_sheet: str = None ) -> str: """Copy a range of cells to another location.""" try: full_path = get_excel_path(filepath) from excel_mcp.sheet import copy_range_operation result = copy_range_operation( full_path, sheet_name, source_start, source_end, target_start, target_sheet ) return result["message"] except (ValidationError, SheetError) as e: return f"Error: {str(e)}" except Exception as e: logger.error(f"Error copying range: {e}") raise @mcp.tool() def delete_range( filepath: str, sheet_name: str, start_cell: str, end_cell: str, shift_direction: str = "up" ) -> str: """Delete a range of cells and shift remaining cells.""" try: full_path = get_excel_path(filepath) from excel_mcp.sheet import delete_range_operation result = delete_range_operation( full_path, sheet_name, start_cell, end_cell, shift_direction ) return result["message"] except (ValidationError, SheetError) as e: return f"Error: {str(e)}" except Exception as e: logger.error(f"Error deleting range: {e}") raise @mcp.tool() def validate_excel_range( filepath: str, sheet_name: str, start_cell: str, end_cell: str = None ) -> str: """Validate if a range exists and is properly formatted.""" try: full_path = get_excel_path(filepath) range_str = start_cell if not end_cell else f"{start_cell}:{end_cell}" result = validate_range_impl(full_path, sheet_name, range_str) return result["message"] except ValidationError as e: return f"Error: {str(e)}" except Exception as e: logger.error(f"Error validating range: {e}") raise @mcp.tool() def process_excel_from_url(url: str, operation: str, operation_params: Dict[str, Any]) -> str: """Download Excel file from URL, process it with specified operation, and upload back to server. Args: url: URL of the Excel file to download and process operation: Name of the operation to perform (e.g., 'format_range', 'apply_formula') operation_params: Dictionary containing parameters for the specified operation Returns: Result message with the URL of the processed file """ try: # 生成临时文件路径 import tempfile import uuid temp_dir = tempfile.gettempdir() temp_filename = f"temp_excel_{uuid.uuid4()}.xlsx" temp_filepath = os.path.join(temp_dir, temp_filename) # 下载文件 from excel_mcp.workbook import download_file_from_url download_file_from_url(url, temp_filepath) logger.info(f"Downloaded file from {url} to {temp_filepath}") # 获取文件名用于上传后的文件名 original_filename = os.path.basename(url.split('/')[-1]) processed_filename = f"processed_{original_filename}" processed_filepath = os.path.join(EXCEL_FILES_PATH, processed_filename) # 复制文件到Excel文件目录 import shutil shutil.copy2(temp_filepath, processed_filepath) # 打印operation_params logger.info(f"operation_params: {operation_params}") # 确保operation_params是字典类型 if isinstance(operation_params, str): import json try: operation_params = json.loads(operation_params) except json.JSONDecodeError: logger.error(f"Invalid JSON in operation_params: {operation_params}") raise ValueError(f"Invalid JSON format in operation_params") # 执行指定操作 result_message = "" if operation == "format_range": # 替换filepath参数为处理后的文件路径 params = {k: v for k, v in operation_params.items() if k != "filepath"} result_message = format_range(processed_filepath, **params) elif operation == "apply_formula": params = {k: v for k, v in operation_params.items() if k != "filepath"} result_message = apply_formula(processed_filepath, **params) elif operation == "write_data_to_excel": params = {k: v for k, v in operation_params.items() if k != "filepath"} result_message = write_data_to_excel(processed_filepath, **params) elif operation == "create_chart": params = {k: v for k, v in operation_params.items() if k != "filepath"} result_message = create_chart(processed_filepath, **params) elif operation == "create_pivot_table": params = {k: v for k, v in operation_params.items() if k != "filepath"} result_message = create_pivot_table(processed_filepath, **params) else: raise ValueError(f"Unsupported operation: {operation}") # 上传处理后的文件 from excel_mcp.workbook import upload_file_to_server upload_result = upload_file_to_server(processed_filepath) # 清理临时文件 os.remove(temp_filepath) return f"Operation '{operation}' completed successfully. Processed file available at: {upload_result['file_url']}\nOperation result: {result_message}" except Exception as e: logger.error(f"Error processing Excel from URL: {e}") raise WorkbookError(f"Failed to process Excel from URL: {str(e)}") async def run_server(): """Run the Excel MCP server.""" try: logger.info(f"Starting Excel MCP server (files directory: {EXCEL_FILES_PATH})") await mcp.run_sse_async() except KeyboardInterrupt: logger.info("Server stopped by user") await mcp.shutdown() except Exception as e: logger.error(f"Server failed: {e}") raise finally: logger.info("Server shutdown complete")